You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by ve...@apache.org on 2014/09/12 22:29:32 UTC
[4/5] git commit: FALCON-661 Add list types to Lineage API.
Contributed by Balu Vellanki
FALCON-661 Add list types to Lineage API. Contributed by Balu Vellanki
Project: http://git-wip-us.apache.org/repos/asf/incubator-falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-falcon/commit/7f98570b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-falcon/tree/7f98570b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-falcon/diff/7f98570b
Branch: refs/heads/master
Commit: 7f98570bb736dc7307d385271300568f185d3ffe
Parents: 42a1b76
Author: Venkatesh Seetharam <ve...@apache.org>
Authored: Fri Sep 12 13:27:34 2014 -0700
Committer: Venkatesh Seetharam <ve...@apache.org>
Committed: Fri Sep 12 13:27:34 2014 -0700
----------------------------------------------------------------------
CHANGES.txt | 3 +
.../java/org/apache/falcon/cli/FalconCLI.java | 6 +
.../apache/falcon/cli/FalconMetadataCLI.java | 99 +++++++
.../org/apache/falcon/client/FalconClient.java | 43 +++
.../falcon/metadata/RelationshipType.java | 64 +++++
.../falcon/metadata/RelationshipType.java | 64 -----
common/src/main/resources/startup.properties | 1 +
docs/src/site/twiki/FalconCLI.twiki | 16 ++
docs/src/site/twiki/restapi/MetadataList.twiki | 30 ++
docs/src/site/twiki/restapi/ResourceList.twiki | 6 +
.../metadata/AbstractMetadataResource.java | 57 ++++
.../metadata/LineageMetadataResource.java | 33 +--
.../metadata/MetadataDiscoveryResource.java | 159 +++++++++++
.../metadata/LineageMetadataResourceTest.java | 277 +++----------------
.../metadata/MetadataDiscoveryResourceTest.java | 152 ++++++++++
.../resource/metadata/MetadataTestContext.java | 231 ++++++++++++++++
.../java/org/apache/falcon/cli/FalconCLIIT.java | 37 +++
.../org/apache/falcon/process/PigProcessIT.java | 2 +-
.../falcon/process/TableStorageProcessIT.java | 2 +-
.../resource/MetadataResourceJerseyIT.java | 104 +++++++
.../org/apache/falcon/resource/TestContext.java | 14 +-
21 files changed, 1066 insertions(+), 334 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/7f98570b/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 5764c47..f1132d9 100755
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -33,6 +33,9 @@ Trunk (Unreleased)
FALCON-263 API to get workflow parameters. (pavan kumar kolamuri via Shwetha GS)
IMPROVEMENTS
+ FALCON-661 Add list types to Lineage API (Balu Vellanki via
+ Venkatesh Seetharam)
+
FALCON-654 Exclude junit dependency in pom (Ruslan Ostafiychuk)
FALCON-640 Add ability to specify sort order for orderBy param in RestAPI
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/7f98570b/client/src/main/java/org/apache/falcon/cli/FalconCLI.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/falcon/cli/FalconCLI.java b/client/src/main/java/org/apache/falcon/cli/FalconCLI.java
index fc2236b..f7229ec 100644
--- a/client/src/main/java/org/apache/falcon/cli/FalconCLI.java
+++ b/client/src/main/java/org/apache/falcon/cli/FalconCLI.java
@@ -59,6 +59,7 @@ public class FalconCLI {
public static final String STATUS_OPTION = "status";
public static final String ADMIN_CMD = "admin";
public static final String HELP_CMD = "help";
+ public static final String METADATA_CMD = "metadata";
private static final String VERSION_CMD = "version";
private static final String STACK_OPTION = "stack";
@@ -162,6 +163,7 @@ public class FalconCLI {
public synchronized int run(final String[] args) {
CLIParser parser = new CLIParser("falcon", FALCON_HELP);
+ FalconMetadataCLI metadataCLI = new FalconMetadataCLI();
parser.addCommand(ADMIN_CMD, "", "admin operations", createAdminOptions(), true);
parser.addCommand(HELP_CMD, "", "display usage", new Options(), false);
@@ -173,6 +175,8 @@ public class FalconCLI {
"Process instances operations like running, status, kill, suspend, resume, rerun, logs",
instanceOptions(), false);
parser.addCommand(GRAPH_CMD, "", "graph operations", createGraphOptions(), true);
+ parser.addCommand(METADATA_CMD, "", "Metadata operations like list, relations",
+ metadataCLI.createMetadataOptions(), true);
parser.addCommand(RECIPE_CMD, "", "recipe operations", createRecipeOptions(), true);
try {
@@ -193,6 +197,8 @@ public class FalconCLI {
instanceCommand(commandLine, client);
} else if (command.getName().equals(GRAPH_CMD)) {
graphCommand(commandLine, client);
+ } else if (command.getName().equals(METADATA_CMD)) {
+ metadataCLI.metadataCommand(commandLine, client);
} else if (command.getName().equals(RECIPE_CMD)) {
recipeCommand(commandLine, client);
}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/7f98570b/client/src/main/java/org/apache/falcon/cli/FalconMetadataCLI.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/falcon/cli/FalconMetadataCLI.java b/client/src/main/java/org/apache/falcon/cli/FalconMetadataCLI.java
new file mode 100644
index 0000000..d3956b0
--- /dev/null
+++ b/client/src/main/java/org/apache/falcon/cli/FalconMetadataCLI.java
@@ -0,0 +1,99 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.cli;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.OptionGroup;
+import org.apache.commons.cli.Options;
+import org.apache.commons.lang.StringUtils;
+import org.apache.falcon.client.FalconCLIException;
+import org.apache.falcon.client.FalconClient;
+import org.apache.falcon.metadata.RelationshipType;
+
+import java.io.PrintStream;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * Metadata extension to Falcon Command Line Interface - wraps the RESTful API for Metadata.
+ */
+public class FalconMetadataCLI {
+
+ public static final AtomicReference<PrintStream> OUT = new AtomicReference<PrintStream>(System.out);
+
+ public static final String LIST_OPT = "list";
+
+ public static final String URL_OPTION = "url";
+ public static final String TYPE_OPT = "type";
+ public static final String CLUSTER_OPT = "cluster";
+
+ public FalconMetadataCLI() {}
+
+ public void metadataCommand(CommandLine commandLine, FalconClient client) throws FalconCLIException {
+ Set<String> optionsList = new HashSet<String>();
+ for (Option option : commandLine.getOptions()) {
+ optionsList.add(option.getOpt());
+ }
+
+ String result = null;
+ String dimensionType = commandLine.getOptionValue(TYPE_OPT);
+ String cluster = commandLine.getOptionValue(CLUSTER_OPT);
+
+ validateDimensionType(dimensionType.toUpperCase());
+
+ if (optionsList.contains(LIST_OPT)) {
+ result = client.getDimensionList(dimensionType, cluster);
+ }
+
+ OUT.get().println(result);
+ }
+
+ private void validateDimensionType(String dimensionType) throws FalconCLIException {
+ if (StringUtils.isEmpty(dimensionType)
+ || dimensionType.contains("INSTANCE")) {
+ throw new FalconCLIException("Invalid value provided for queryParam \"type\" " + dimensionType);
+ }
+ try {
+ RelationshipType.valueOf(dimensionType);
+ } catch (IllegalArgumentException iae) {
+ throw new FalconCLIException("Invalid value provided for queryParam \"type\" " + dimensionType);
+ }
+ }
+
+ public Options createMetadataOptions() {
+ Options metadataOptions = new Options();
+
+ OptionGroup group = new OptionGroup();
+ Option list = new Option(LIST_OPT, false, "List all dimensions");
+ group.addOption(list);
+
+ Option url = new Option(URL_OPTION, true, "Falcon URL");
+ Option type = new Option(TYPE_OPT, true, "Dimension type");
+ Option cluster = new Option(CLUSTER_OPT, true, "Cluster name");
+
+ metadataOptions.addOptionGroup(group);
+ metadataOptions.addOption(url);
+ metadataOptions.addOption(type);
+ metadataOptions.addOption(cluster);
+
+ return metadataOptions;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/7f98570b/client/src/main/java/org/apache/falcon/client/FalconClient.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/falcon/client/FalconClient.java b/client/src/main/java/org/apache/falcon/client/FalconClient.java
index a63a76b..205ff31 100644
--- a/client/src/main/java/org/apache/falcon/client/FalconClient.java
+++ b/client/src/main/java/org/apache/falcon/client/FalconClient.java
@@ -27,6 +27,7 @@ import org.apache.commons.io.IOUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.net.util.TrustManagerUtils;
import org.apache.falcon.LifeCycle;
+import org.apache.falcon.cli.FalconMetadataCLI;
import org.apache.falcon.entity.v0.EntityType;
import org.apache.falcon.entity.v0.SchemaHelper;
import org.apache.falcon.recipe.RecipeTool;
@@ -203,6 +204,24 @@ public class FalconClient {
}
/**
+ * Methods allowed on Metadata Resources.
+ */
+ protected static enum MetadataOperations {
+
+ LIST("api/metadata/", HttpMethod.GET, MediaType.APPLICATION_JSON);
+
+ private String path;
+ private String method;
+ private String mimeType;
+
+ MetadataOperations(String path, String method, String mimeType) {
+ this.path = path;
+ this.method = method;
+ this.mimeType = mimeType;
+ }
+ }
+
+ /**
* Methods allowed on Process Instance Resources.
*/
protected static enum Instances {
@@ -484,6 +503,10 @@ public class FalconClient {
return clientResponse.getStatus();
}
+ public String getDimensionList(String dimensionType, String cluster) throws FalconCLIException {
+ return sendMetadataListRequest(MetadataOperations.LIST, dimensionType, cluster);
+ }
+
/**
* Converts a InputStream into ServletInputStream.
*
@@ -765,6 +788,26 @@ public class FalconClient {
return parseStringResult(clientResponse);
}
+ private String sendMetadataListRequest(final MetadataOperations op, final String dimensionType,
+ final String cluster) throws FalconCLIException {
+ WebResource resource = service
+ .path(op.path)
+ .path(dimensionType)
+ .path(FalconMetadataCLI.LIST_OPT);
+
+ if (!StringUtils.isEmpty(cluster)) {
+ resource = resource.queryParam(FalconMetadataCLI.CLUSTER_OPT, cluster);
+ }
+
+ ClientResponse clientResponse = resource
+ .header("Cookie", AUTH_COOKIE_EQ + authenticationToken)
+ .accept(op.mimeType).type(op.mimeType)
+ .method(op.method, ClientResponse.class);
+
+ checkIfSuccessful(clientResponse);
+ return parseStringResult(clientResponse);
+ }
+
private String parseAPIResult(ClientResponse clientResponse)
throws FalconCLIException {
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/7f98570b/client/src/main/java/org/apache/falcon/metadata/RelationshipType.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/falcon/metadata/RelationshipType.java b/client/src/main/java/org/apache/falcon/metadata/RelationshipType.java
new file mode 100644
index 0000000..f034772
--- /dev/null
+++ b/client/src/main/java/org/apache/falcon/metadata/RelationshipType.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.metadata;
+
+/**
+ * Enumerates Relationship types.
+ */
+public enum RelationshipType {
+
+ // entity vertex types
+ CLUSTER_ENTITY("cluster-entity"),
+ FEED_ENTITY("feed-entity"),
+ PROCESS_ENTITY("process-entity"),
+
+ // instance vertex types
+ FEED_INSTANCE("feed-instance"),
+ PROCESS_INSTANCE("process-instance"),
+
+ // Misc vertex types
+ USER("user"),
+ COLO("data-center"),
+ TAGS("classification"),
+ GROUPS("group"),
+ PIPELINES("pipelines");
+
+
+ private final String name;
+
+ RelationshipType(java.lang.String name) {
+ this.name = name;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public static RelationshipType fromString(String value) {
+ if (value != null) {
+ for (RelationshipType type : RelationshipType.values()) {
+ if (value.equals(type.getName())) {
+ return type;
+ }
+ }
+ }
+
+ throw new IllegalArgumentException("No constant with value " + value + " found");
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/7f98570b/common/src/main/java/org/apache/falcon/metadata/RelationshipType.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/metadata/RelationshipType.java b/common/src/main/java/org/apache/falcon/metadata/RelationshipType.java
deleted file mode 100644
index f034772..0000000
--- a/common/src/main/java/org/apache/falcon/metadata/RelationshipType.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.falcon.metadata;
-
-/**
- * Enumerates Relationship types.
- */
-public enum RelationshipType {
-
- // entity vertex types
- CLUSTER_ENTITY("cluster-entity"),
- FEED_ENTITY("feed-entity"),
- PROCESS_ENTITY("process-entity"),
-
- // instance vertex types
- FEED_INSTANCE("feed-instance"),
- PROCESS_INSTANCE("process-instance"),
-
- // Misc vertex types
- USER("user"),
- COLO("data-center"),
- TAGS("classification"),
- GROUPS("group"),
- PIPELINES("pipelines");
-
-
- private final String name;
-
- RelationshipType(java.lang.String name) {
- this.name = name;
- }
-
- public String getName() {
- return name;
- }
-
- public static RelationshipType fromString(String value) {
- if (value != null) {
- for (RelationshipType type : RelationshipType.values()) {
- if (value.equals(type.getName())) {
- return type;
- }
- }
- }
-
- throw new IllegalArgumentException("No constant with value " + value + " found");
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/7f98570b/common/src/main/resources/startup.properties
----------------------------------------------------------------------
diff --git a/common/src/main/resources/startup.properties b/common/src/main/resources/startup.properties
index 314146c..e233b2a 100644
--- a/common/src/main/resources/startup.properties
+++ b/common/src/main/resources/startup.properties
@@ -36,6 +36,7 @@
org.apache.falcon.entity.store.ConfigurationStore,\
org.apache.falcon.rerun.service.RetryService,\
org.apache.falcon.rerun.service.LateRunService,\
+ org.apache.falcon.metadata.MetadataMappingService,\
org.apache.falcon.service.LogCleanupService
##### Falcon Configuration Store Change listeners #####
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/7f98570b/docs/src/site/twiki/FalconCLI.twiki
----------------------------------------------------------------------
diff --git a/docs/src/site/twiki/FalconCLI.twiki b/docs/src/site/twiki/FalconCLI.twiki
index afa8136..d4ce366 100644
--- a/docs/src/site/twiki/FalconCLI.twiki
+++ b/docs/src/site/twiki/FalconCLI.twiki
@@ -301,6 +301,22 @@ Status returns the current state of Falcon (running or stopped).
Usage:
$FALCON_HOME/bin/falcon admin -status
+---++ Metadata Options
+
+---+++ List
+
+List all dimensions of type.
+
+Usage:
+$FALCON_HOME/bin/falcon metadata -list -type [cluster_entity|feed_entity|process_entity|user|colo|tags|groups|pipelines]
+
+Optional Args : -cluster <<cluster name>>
+
+Lists of all dimensions of given type. If the user provides optional param cluster, only the dimensions related to the cluster are listed.
+Example:
+$FALCON_HOME/bin/falcon metadata -list -type process_entity -cluster primary-cluster
+$FALCON_HOME/bin/falcon metadata -list -type tags
+
---++ Recipe Options
---+++ Submit Recipe
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/7f98570b/docs/src/site/twiki/restapi/MetadataList.twiki
----------------------------------------------------------------------
diff --git a/docs/src/site/twiki/restapi/MetadataList.twiki b/docs/src/site/twiki/restapi/MetadataList.twiki
new file mode 100644
index 0000000..b6da203
--- /dev/null
+++ b/docs/src/site/twiki/restapi/MetadataList.twiki
@@ -0,0 +1,30 @@
+---++ GET api/metadata/:type/list
+ * <a href="#Description">Description</a>
+ * <a href="#Parameters">Parameters</a>
+ * <a href="#Results">Results</a>
+ * <a href="#Examples">Examples</a>
+
+---++ Description
+Get all dimensions of specified type.
+
+---++ Parameters
+ * :type Valid dimension types are cluster_entity,feed_entity, process_entity, user, colo, tags, groups, pipelines
+ * cluster <optional query param> Show dimensions related to this cluster.
+
+
+---++ Results
+List of dimensions that match requested type [and cluster].
+
+---++ Examples
+---+++ Rest Call
+<verbatim>
+GET http://localhost:15000/api/metadata/process_entity/list?cluster=primary-cluster
+</verbatim>
+---+++ Result
+<verbatim>
+{
+ "results": ["sampleIngestProcess","testProcess","anotherProcess"],
+ "totalSize": 3
+}
+</verbatim>
+
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/7f98570b/docs/src/site/twiki/restapi/ResourceList.twiki
----------------------------------------------------------------------
diff --git a/docs/src/site/twiki/restapi/ResourceList.twiki b/docs/src/site/twiki/restapi/ResourceList.twiki
index d9cb3cb..34a07d5 100644
--- a/docs/src/site/twiki/restapi/ResourceList.twiki
+++ b/docs/src/site/twiki/restapi/ResourceList.twiki
@@ -5,6 +5,7 @@
* <a href="#REST_Call_on_Feed_and_Process_Instances">REST Call on Feed/Process Instances</a>
* <a href="#REST_Call_on_Admin_Resource">REST Call on Admin Resource</a>
* <a href="#REST_Call_on_Lineage_Graph">REST Call on Lineage Graph Resource</a>
+ * <a href="#REST_Call_on_Metadata_Resource">REST Call on Metadata Resource</a>
---++ Authentication
@@ -76,3 +77,8 @@ See also: [[../Security.twiki][Security in Falcon]]
| GET | [[AdjacentVertices][api/graphs/lineage/vertices/:id/:direction]] | get the adjacent vertices or edges of the vertex with the specified direction |
| GET | [[AllEdges][api/graphs/lineage//edges/all]] | get all edges |
| GET | [[Edge][api/graphs/lineage/edges/:id]] | get the edge with the specified id |
+
+---++ REST Call on Metadata Resource
+
+| *Call Type* | *Resource* | *Description* |
+| GET | [[MetadataList][api/metadata/:dimension-type/list]] | list of dimensions |
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/7f98570b/prism/src/main/java/org/apache/falcon/resource/metadata/AbstractMetadataResource.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/resource/metadata/AbstractMetadataResource.java b/prism/src/main/java/org/apache/falcon/resource/metadata/AbstractMetadataResource.java
new file mode 100644
index 0000000..a6c9b78
--- /dev/null
+++ b/prism/src/main/java/org/apache/falcon/resource/metadata/AbstractMetadataResource.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.resource.metadata;
+
+import com.tinkerpop.blueprints.Graph;
+import org.apache.falcon.metadata.MetadataMappingService;
+import org.apache.falcon.service.FalconService;
+import org.apache.falcon.service.Services;
+
+import java.util.Set;
+
+/**
+ * A base class for managing Metadata operations.
+]*/
+public abstract class AbstractMetadataResource {
+ public static final String RESULTS = "results";
+ public static final String TOTAL_SIZE = "totalSize";
+
+ protected MetadataMappingService service;
+
+ public AbstractMetadataResource() {
+ service = (MetadataMappingService)getService(MetadataMappingService.SERVICE_NAME);
+ }
+
+ public static FalconService getService(String serviceName) {
+ return (Services.get().isRegistered(serviceName))
+ ? Services.get().getService(serviceName) : null;
+ }
+
+ protected Graph getGraph() {
+ return service.getGraph();
+ }
+
+ protected Set<String> getVertexIndexedKeys() {
+ return service.getVertexIndexedKeys();
+ }
+
+ protected Set<String> getEdgeIndexedKeys() {
+ return service.getEdgeIndexedKeys();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/7f98570b/prism/src/main/java/org/apache/falcon/resource/metadata/LineageMetadataResource.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/resource/metadata/LineageMetadataResource.java b/prism/src/main/java/org/apache/falcon/resource/metadata/LineageMetadataResource.java
index cf6b6b1..2181bea 100644
--- a/prism/src/main/java/org/apache/falcon/resource/metadata/LineageMetadataResource.java
+++ b/prism/src/main/java/org/apache/falcon/resource/metadata/LineageMetadataResource.java
@@ -21,24 +21,21 @@ package org.apache.falcon.resource.metadata;
import com.tinkerpop.blueprints.Direction;
import com.tinkerpop.blueprints.Edge;
import com.tinkerpop.blueprints.Element;
-import com.tinkerpop.blueprints.Graph;
import com.tinkerpop.blueprints.Vertex;
import com.tinkerpop.blueprints.VertexQuery;
import com.tinkerpop.blueprints.util.io.graphson.GraphSONMode;
import com.tinkerpop.blueprints.util.io.graphson.GraphSONUtility;
+import org.apache.commons.lang.StringUtils;
import org.apache.falcon.metadata.GraphUtils;
-import org.apache.falcon.metadata.MetadataMappingService;
import org.apache.falcon.metadata.RelationshipLabel;
import org.apache.falcon.metadata.RelationshipProperty;
import org.apache.falcon.metadata.RelationshipType;
-import org.apache.falcon.service.Services;
import org.apache.falcon.util.StartupProperties;
import org.codehaus.jettison.json.JSONArray;
import org.codehaus.jettison.json.JSONException;
import org.codehaus.jettison.json.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.commons.lang.StringUtils;
import javax.ws.rs.DefaultValue;
import javax.ws.rs.GET;
@@ -51,7 +48,6 @@ import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import java.util.HashMap;
import java.util.Map;
-import java.util.Set;
/**
* Jersey Resource for lineage metadata operations.
@@ -59,35 +55,10 @@ import java.util.Set;
* https://github.com/tinkerpop/rexster/wiki/Basic-REST-API
*/
@Path("graphs/lineage")
-public class LineageMetadataResource {
+public class LineageMetadataResource extends AbstractMetadataResource {
private static final Logger LOG = LoggerFactory.getLogger(LineageMetadataResource.class);
- public static final String RESULTS = "results";
- public static final String TOTAL_SIZE = "totalSize";
-
- private final MetadataMappingService service;
-
- public LineageMetadataResource() {
- if (Services.get().isRegistered(MetadataMappingService.SERVICE_NAME)) {
- service = Services.get().getService(MetadataMappingService.SERVICE_NAME);
- } else {
- service = null;
- }
- }
-
- private Graph getGraph() {
- return service.getGraph();
- }
-
- private Set<String> getVertexIndexedKeys() {
- return service.getVertexIndexedKeys();
- }
-
- private Set<String> getEdgeIndexedKeys() {
- return service.getEdgeIndexedKeys();
- }
-
/**
* Dump the graph.
*
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/7f98570b/prism/src/main/java/org/apache/falcon/resource/metadata/MetadataDiscoveryResource.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/resource/metadata/MetadataDiscoveryResource.java b/prism/src/main/java/org/apache/falcon/resource/metadata/MetadataDiscoveryResource.java
new file mode 100644
index 0000000..d15d7cc
--- /dev/null
+++ b/prism/src/main/java/org/apache/falcon/resource/metadata/MetadataDiscoveryResource.java
@@ -0,0 +1,159 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.resource.metadata;
+
+import com.tinkerpop.blueprints.Direction;
+import com.tinkerpop.blueprints.Edge;
+import com.tinkerpop.blueprints.GraphQuery;
+import com.tinkerpop.blueprints.Vertex;
+import org.apache.commons.lang.StringUtils;
+import org.apache.falcon.metadata.RelationshipLabel;
+import org.apache.falcon.metadata.RelationshipProperty;
+import org.apache.falcon.metadata.RelationshipType;
+import org.codehaus.jettison.json.JSONArray;
+import org.codehaus.jettison.json.JSONException;
+import org.codehaus.jettison.json.JSONObject;
+
+import javax.ws.rs.GET;
+import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
+import javax.ws.rs.Produces;
+import javax.ws.rs.QueryParam;
+import javax.ws.rs.WebApplicationException;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import java.util.Iterator;
+
+/**
+ * Jersey Resource for metadata operations.
+ */
+@Path("metadata")
+public class MetadataDiscoveryResource extends AbstractMetadataResource {
+
+ /**
+ * Get list of dimensions for the given dimension-type.
+ * <p/>
+ * GET http://host/metadata/dimension-type/list
+ */
+ @GET
+ @Path("/{type}/list")
+ @Produces({MediaType.APPLICATION_JSON})
+ public Response listDimensionValues(@PathParam("type") String type,
+ @QueryParam("cluster") final String clusterName) {
+ RelationshipType relationshipType = validateAndParseDimensionType(type.toUpperCase());
+ GraphQuery query = getGraph().query();
+ JSONArray dimensionValues = new JSONArray();
+
+ if (StringUtils.isEmpty(clusterName)) {
+ query = query.has(RelationshipProperty.TYPE.getName(), relationshipType.getName());
+ dimensionValues = getDimensionsFromVertices(dimensionValues,
+ query.vertices().iterator());
+ } else {
+ // Get clusterVertex, get adjacent vertices of clusterVertex that match dimension type.
+ query = query
+ .has(RelationshipProperty.TYPE.getName(), RelationshipType.CLUSTER_ENTITY.getName())
+ .has(RelationshipProperty.NAME.getName(), clusterName);
+ Iterator<Vertex> clusterIterator = query.vertices().iterator();
+ if (clusterIterator.hasNext()) {
+ dimensionValues = getDimensionsFromClusterVertex(
+ dimensionValues, clusterIterator.next(), relationshipType);
+ } // else, no cluster found. Return empty results
+ }
+
+ try {
+ JSONObject response = new JSONObject();
+ response.put(RESULTS, dimensionValues);
+ response.put(TOTAL_SIZE, dimensionValues.length());
+ return Response.ok(response).build();
+ } catch (JSONException e) {
+ throw new WebApplicationException(Response.status(Response.Status.INTERNAL_SERVER_ERROR)
+ .entity(JSONObject.quote("An error occurred: " + e.getMessage())).build());
+ }
+ }
+
+ private JSONArray getDimensionsFromClusterVertex(JSONArray dimensionValues, Vertex clusterVertex,
+ RelationshipType relationshipType) {
+ switch (relationshipType) {
+ case FEED_ENTITY:
+ return getDimensionsFromEdges(dimensionValues, Direction.OUT,
+ clusterVertex.getEdges(
+ Direction.IN, RelationshipLabel.FEED_CLUSTER_EDGE.getName()).iterator(),
+ relationshipType);
+
+ case PROCESS_ENTITY:
+ return getDimensionsFromEdges(dimensionValues, Direction.OUT,
+ clusterVertex.getEdges(
+ Direction.IN, RelationshipLabel.PROCESS_CLUSTER_EDGE.getName()).iterator(),
+ relationshipType);
+
+ case COLO:
+ return getDimensionsFromEdges(dimensionValues, Direction.IN,
+ clusterVertex.getEdges(
+ Direction.OUT, RelationshipLabel.CLUSTER_COLO.getName()).iterator(),
+ relationshipType);
+
+ case CLUSTER_ENTITY:
+ return getDimensionFromVertex(dimensionValues, clusterVertex);
+
+ default:
+ return dimensionValues;
+ }
+ }
+
+ private JSONArray getDimensionsFromVertices(JSONArray dimensionValues,
+ Iterator<Vertex> vertexIterator) {
+ while (vertexIterator.hasNext()) {
+ dimensionValues = getDimensionFromVertex(dimensionValues, vertexIterator.next());
+ }
+
+ return dimensionValues;
+ }
+
+ private JSONArray getDimensionsFromEdges(JSONArray dimensionValues, Direction direction,
+ Iterator<Edge> edgeIterator,
+ RelationshipType relationshipType) {
+ while(edgeIterator.hasNext()) {
+ Vertex vertex = edgeIterator.next().getVertex(direction);
+ if (vertex.getProperty(RelationshipProperty.TYPE.getName())
+ .equals(relationshipType.getName())) {
+ dimensionValues = getDimensionFromVertex(dimensionValues, vertex);
+ }
+ }
+
+ return dimensionValues;
+ }
+
+ private JSONArray getDimensionFromVertex(JSONArray dimensionValues, Vertex vertex) {
+ return dimensionValues.put(vertex.getProperty(RelationshipProperty.NAME.getName()));
+ }
+
+ private RelationshipType validateAndParseDimensionType(String type) {
+ if (StringUtils.isEmpty(type) || type.contains("INSTANCE")) {
+ throw new WebApplicationException(Response.status(Response.Status.BAD_REQUEST)
+ .entity("Invalid Dimension type : " + type).type("text/plain").build());
+ }
+
+ try {
+ return RelationshipType.valueOf(type);
+ } catch (IllegalArgumentException iae) {
+ throw new WebApplicationException(Response.status(Response.Status.BAD_REQUEST)
+ .entity("Invalid Dimension type : " + type).type("text/plain").build());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/7f98570b/prism/src/test/java/org/apache/falcon/resource/metadata/LineageMetadataResourceTest.java
----------------------------------------------------------------------
diff --git a/prism/src/test/java/org/apache/falcon/resource/metadata/LineageMetadataResourceTest.java b/prism/src/test/java/org/apache/falcon/resource/metadata/LineageMetadataResourceTest.java
index 9bef7f5..0a8efc9 100644
--- a/prism/src/test/java/org/apache/falcon/resource/metadata/LineageMetadataResourceTest.java
+++ b/prism/src/test/java/org/apache/falcon/resource/metadata/LineageMetadataResourceTest.java
@@ -22,26 +22,10 @@ import com.tinkerpop.blueprints.Direction;
import com.tinkerpop.blueprints.Edge;
import com.tinkerpop.blueprints.Graph;
import com.tinkerpop.blueprints.Vertex;
-import org.apache.falcon.cluster.util.EntityBuilderTestUtil;
-import org.apache.falcon.entity.Storage;
-import org.apache.falcon.entity.store.ConfigurationStore;
-import org.apache.falcon.entity.v0.EntityType;
-import org.apache.falcon.entity.v0.cluster.Cluster;
-import org.apache.falcon.entity.v0.feed.CatalogTable;
-import org.apache.falcon.entity.v0.feed.Feed;
-import org.apache.falcon.entity.v0.feed.Location;
-import org.apache.falcon.entity.v0.feed.LocationType;
-import org.apache.falcon.entity.v0.feed.Locations;
-import org.apache.falcon.entity.v0.process.EngineType;
-import org.apache.falcon.entity.v0.process.Process;
-import org.apache.falcon.metadata.MetadataMappingService;
import org.apache.falcon.metadata.RelationshipLabel;
import org.apache.falcon.metadata.RelationshipProperty;
-import org.apache.falcon.security.CurrentUser;
import org.apache.falcon.service.Services;
import org.apache.falcon.util.StartupProperties;
-import org.apache.falcon.workflow.WorkflowExecutionArgs;
-import org.apache.falcon.workflow.WorkflowExecutionContext;
import org.apache.falcon.workflow.WorkflowJobEndNotificationService;
import org.json.simple.JSONValue;
import org.testng.Assert;
@@ -55,7 +39,6 @@ import java.io.File;
import java.io.FileFilter;
import java.util.ArrayList;
import java.util.Arrays;
-import java.util.Collection;
import java.util.List;
import java.util.Map;
@@ -63,63 +46,18 @@ import java.util.Map;
* Unit tests for org.apache.falcon.resource.metadata.LineageMetadataResource.
*/
public class LineageMetadataResourceTest {
- public static final String FALCON_USER = "falcon-user";
- private static final String LOGS_DIR = "target/log";
- private static final String NOMINAL_TIME = "2014-01-01-01-00";
- public static final String OPERATION = "GENERATE";
-
- public static final String CLUSTER_ENTITY_NAME = "primary-cluster";
- public static final String PROCESS_ENTITY_NAME = "sample-process";
- public static final String COLO_NAME = "west-coast";
- public static final String WORKFLOW_NAME = "imp-click-join-workflow";
- public static final String WORKFLOW_VERSION = "1.0.9";
-
- public static final String INPUT_FEED_NAMES = "impression-feed#clicks-feed";
- public static final String INPUT_INSTANCE_PATHS =
- "jail://global:00/falcon/impression-feed/20140101#jail://global:00/falcon/clicks-feed/20140101";
-
- public static final String OUTPUT_FEED_NAMES = "imp-click-join1,imp-click-join2";
- public static final String OUTPUT_INSTANCE_PATHS =
- "jail://global:00/falcon/imp-click-join1/20140101,jail://global:00/falcon/imp-click-join2/20140101";
-
- private ConfigurationStore configStore;
- private MetadataMappingService service;
-
- private Cluster clusterEntity;
- private List<Feed> inputFeeds = new ArrayList<Feed>();
- private List<Feed> outputFeeds = new ArrayList<Feed>();
+ private MetadataTestContext testContext;
@BeforeClass
public void setUp() throws Exception {
- CurrentUser.authenticate(FALCON_USER);
-
- configStore = ConfigurationStore.get();
-
- Services.get().register(new WorkflowJobEndNotificationService());
- Assert.assertTrue(Services.get().isRegistered(WorkflowJobEndNotificationService.SERVICE_NAME));
-
- StartupProperties.get().setProperty("falcon.graph.preserve.history", "true");
- service = new MetadataMappingService();
- service.init();
- Services.get().register(service);
- Assert.assertTrue(Services.get().isRegistered(MetadataMappingService.SERVICE_NAME));
-
- addClusterEntity();
- addFeedEntity();
- addProcessEntity();
- addInstance();
+ testContext = new MetadataTestContext();
+ testContext.setUp();
}
@AfterClass
public void tearDown() throws Exception {
- cleanupGraphStore(service.getGraph());
- cleanupConfigurationStore(configStore);
-
- service.destroy();
-
- StartupProperties.get().setProperty("falcon.graph.preserve.history", "false");
- Services.get().reset();
+ testContext.tearDown();
}
@Test (expectedExceptions = WebApplicationException.class)
@@ -132,10 +70,10 @@ public class LineageMetadataResourceTest {
@Test
public void testGetVerticesWithId() throws Exception {
- Vertex vertex = service.getGraph().getVertices(RelationshipProperty.NAME.getName(),
- PROCESS_ENTITY_NAME + "/2014-01-01T01:00Z").iterator().next();
-
LineageMetadataResource resource = new LineageMetadataResource();
+ Vertex vertex = resource.getGraph().getVertices(RelationshipProperty.NAME.getName(),
+ MetadataTestContext.PROCESS_ENTITY_NAME + "/2014-01-01T01:00Z").iterator().next();
+
Response response = resource.getVertex(String.valueOf(vertex.getId()));
Assert.assertEquals(response.getStatus(), Response.Status.OK.getStatusCode());
@@ -147,11 +85,11 @@ public class LineageMetadataResourceTest {
@Test
public void testGetVertexPropertiesForProcessInstance() throws Exception {
- String processInstance = PROCESS_ENTITY_NAME + "/2014-01-01T01:00Z";
- Vertex vertex = service.getGraph().getVertices(
+ String processInstance = MetadataTestContext.PROCESS_ENTITY_NAME + "/2014-01-01T01:00Z";
+ LineageMetadataResource resource = new LineageMetadataResource();
+ Vertex vertex = resource.getGraph().getVertices(
RelationshipProperty.NAME.getName(), processInstance).iterator().next();
- LineageMetadataResource resource = new LineageMetadataResource();
Response response = resource.getVertexProperties(String.valueOf(vertex.getId()), "true");
Assert.assertEquals(response.getStatus(), Response.Status.OK.getStatusCode());
@@ -166,10 +104,10 @@ public class LineageMetadataResourceTest {
@Test
public void testGetVertexPropertiesForFeedInstance() throws Exception {
String feedInstance = "impression-feed/2014-01-01T00:00Z";
- Vertex vertex = service.getGraph().getVertices(
+ LineageMetadataResource resource = new LineageMetadataResource();
+ Vertex vertex = resource.getGraph().getVertices(
RelationshipProperty.NAME.getName(), feedInstance).iterator().next();
- LineageMetadataResource resource = new LineageMetadataResource();
Response response = resource.getVertexProperties(String.valueOf(vertex.getId()), "true");
Assert.assertEquals(response.getStatus(), Response.Status.OK.getStatusCode());
@@ -183,10 +121,10 @@ public class LineageMetadataResourceTest {
@Test
public void testGetVertexPropertiesForFeedInstanceNoTags() throws Exception {
String feedInstance = "clicks-feed/2014-01-01T00:00Z";
- Vertex vertex = service.getGraph().getVertices(
+ LineageMetadataResource resource = new LineageMetadataResource();
+ Vertex vertex = resource.getGraph().getVertices(
RelationshipProperty.NAME.getName(), feedInstance).iterator().next();
- LineageMetadataResource resource = new LineageMetadataResource();
Response response = resource.getVertexProperties(String.valueOf(vertex.getId()), "true");
Assert.assertEquals(response.getStatus(), Response.Status.OK.getStatusCode());
@@ -199,11 +137,11 @@ public class LineageMetadataResourceTest {
@Test
public void testGetVerticesWithKeyValue() throws Exception {
- String processInstance = PROCESS_ENTITY_NAME + "/2014-01-01T01:00Z";
- Vertex vertex = service.getGraph().getVertices(
+ String processInstance = MetadataTestContext.PROCESS_ENTITY_NAME + "/2014-01-01T01:00Z";
+ LineageMetadataResource resource = new LineageMetadataResource();
+ Vertex vertex = resource.getGraph().getVertices(
RelationshipProperty.NAME.getName(), processInstance).iterator().next();
- LineageMetadataResource resource = new LineageMetadataResource();
Response response = resource.getVertices(RelationshipProperty.NAME.getName(), processInstance);
Assert.assertEquals(response.getStatus(), Response.Status.OK.getStatusCode());
@@ -227,14 +165,17 @@ public class LineageMetadataResourceTest {
@Test
public void testVertexEdgesForIdAndDirectionOut() throws Exception {
- String processInstance = PROCESS_ENTITY_NAME + "/2014-01-01T01:00Z";
- Vertex vertex = service.getGraph().getVertices(
+ String processInstance = MetadataTestContext.PROCESS_ENTITY_NAME + "/2014-01-01T01:00Z";
+ LineageMetadataResource resource = new LineageMetadataResource();
+ Vertex vertex = resource.getGraph().getVertices(
RelationshipProperty.NAME.getName(), processInstance).iterator().next();
String vertexId = String.valueOf(vertex.getId());
- int expectedSize = 6; // 2 output feeds, user, cluster, process entity, tag
- List<String> expected = Arrays.asList(FALCON_USER, CLUSTER_ENTITY_NAME, "Critical",
- PROCESS_ENTITY_NAME, "imp-click-join1/2014-01-01T00:00Z", "imp-click-join2/2014-01-01T00:00Z");
+ int expectedSize = 7; // 2 output feeds, user, cluster, process entity, tag, pipeline
+ List<String> expected = Arrays.asList(MetadataTestContext.FALCON_USER,
+ MetadataTestContext.CLUSTER_ENTITY_NAME, "Critical",
+ MetadataTestContext.PROCESS_ENTITY_NAME,
+ "imp-click-join1/2014-01-01T00:00Z", "imp-click-join2/2014-01-01T00:00Z");
verifyVertexEdges(vertexId, LineageMetadataResource.OUT, expectedSize, expected);
verifyVertexEdgesCount(vertexId, LineageMetadataResource.OUT_COUNT, expectedSize);
@@ -244,8 +185,9 @@ public class LineageMetadataResourceTest {
@Test
public void testVertexEdgesForIdAndDirectionIn() throws Exception {
- String processInstance = PROCESS_ENTITY_NAME + "/2014-01-01T01:00Z";
- Vertex vertex = service.getGraph().getVertices(
+ String processInstance = MetadataTestContext.PROCESS_ENTITY_NAME + "/2014-01-01T01:00Z";
+ LineageMetadataResource resource = new LineageMetadataResource();
+ Vertex vertex = resource.getGraph().getVertices(
RelationshipProperty.NAME.getName(), processInstance).iterator().next();
String vertexId = String.valueOf(vertex.getId());
@@ -260,14 +202,17 @@ public class LineageMetadataResourceTest {
@Test
public void testVertexEdgesForIdAndDirectionBoth() throws Exception {
- String processInstance = PROCESS_ENTITY_NAME + "/2014-01-01T01:00Z";
- Vertex vertex = service.getGraph().getVertices(
+ String processInstance = MetadataTestContext.PROCESS_ENTITY_NAME + "/2014-01-01T01:00Z";
+ LineageMetadataResource resource = new LineageMetadataResource();
+ Vertex vertex = resource.getGraph().getVertices(
RelationshipProperty.NAME.getName(), processInstance).iterator().next();
String vertexId = String.valueOf(vertex.getId());
- int expectedSize = 8;
- List<String> expected = Arrays.asList(FALCON_USER, CLUSTER_ENTITY_NAME, "Critical",
- PROCESS_ENTITY_NAME, "imp-click-join1/2014-01-01T00:00Z", "imp-click-join2/2014-01-01T00:00Z",
+ int expectedSize = 9;
+ List<String> expected = Arrays.asList(MetadataTestContext.FALCON_USER,
+ MetadataTestContext.CLUSTER_ENTITY_NAME, "Critical",
+ MetadataTestContext.PROCESS_ENTITY_NAME,
+ "imp-click-join1/2014-01-01T00:00Z", "imp-click-join2/2014-01-01T00:00Z",
"impression-feed/2014-01-01T00:00Z", "clicks-feed/2014-01-01T00:00Z");
verifyVertexEdges(vertexId, LineageMetadataResource.BOTH, expectedSize, expected);
@@ -276,8 +221,6 @@ public class LineageMetadataResourceTest {
verifyVertexEdgesCount(vertexId, LineageMetadataResource.BOTH_IDS, expectedSize);
}
-
-
@Test (expectedExceptions = WebApplicationException.class)
public void testVertexEdgesForIdAndInvalidDirection() throws Exception {
LineageMetadataResource resource = new LineageMetadataResource();
@@ -324,15 +267,15 @@ public class LineageMetadataResourceTest {
@Test
public void testEdgesById() throws Exception {
- String processInstance = PROCESS_ENTITY_NAME + "/2014-01-01T01:00Z";
- Vertex vertex = service.getGraph().getVertices(
+ String processInstance = MetadataTestContext.PROCESS_ENTITY_NAME + "/2014-01-01T01:00Z";
+ LineageMetadataResource resource = new LineageMetadataResource();
+ Vertex vertex = resource.getGraph().getVertices(
RelationshipProperty.NAME.getName(), processInstance).iterator().next();
Edge edge = vertex.getEdges(Direction.OUT,
RelationshipLabel.PROCESS_CLUSTER_EDGE.getName()).iterator().next();
Vertex toVertex = edge.getVertex(Direction.IN);
- LineageMetadataResource resource = new LineageMetadataResource();
Response response = resource.getEdge(String.valueOf(edge.getId()));
Assert.assertEquals(response.getStatus(), Response.Status.OK.getStatusCode());
@@ -352,7 +295,7 @@ public class LineageMetadataResourceTest {
Map results = (Map) JSONValue.parse(response.getEntity().toString());
long totalSize = Long.valueOf(results.get(LineageMetadataResource.TOTAL_SIZE).toString());
- Assert.assertEquals(totalSize, getVerticesCount(service.getGraph()));
+ Assert.assertEquals(totalSize, getVerticesCount(resource.getGraph()));
}
@Test
@@ -363,7 +306,7 @@ public class LineageMetadataResourceTest {
Map results = (Map) JSONValue.parse(response.getEntity().toString());
long totalSize = Long.valueOf(results.get(LineageMetadataResource.TOTAL_SIZE).toString());
- Assert.assertEquals(totalSize, getEdgesCount(service.getGraph()));
+ Assert.assertEquals(totalSize, getEdgesCount(resource.getGraph()));
}
@Test (expectedExceptions = WebApplicationException.class)
@@ -406,7 +349,7 @@ public class LineageMetadataResourceTest {
Assert.assertEquals(response.getEntity().toString(), "Lineage Metadata Service is not enabled.");
} finally {
Services.get().register(new WorkflowJobEndNotificationService());
- Services.get().register(service);
+ Services.get().register(testContext.getService());
}
}
@@ -445,16 +388,16 @@ public class LineageMetadataResourceTest {
}
private void assertProcessInstanceRelationships(Map vertexProperties) {
- Assert.assertEquals(vertexProperties.get(RelationshipLabel.USER.getName()), FALCON_USER);
+ Assert.assertEquals(vertexProperties.get(RelationshipLabel.USER.getName()), MetadataTestContext.FALCON_USER);
Assert.assertEquals(vertexProperties.get(RelationshipLabel.PROCESS_CLUSTER_EDGE.getName()),
- CLUSTER_ENTITY_NAME);
+ MetadataTestContext.CLUSTER_ENTITY_NAME);
Assert.assertEquals(vertexProperties.get("classified-as"), "Critical");
}
private void assertFeedInstanceRelationships(Map vertexProperties, boolean skipRelationships) {
- Assert.assertEquals(vertexProperties.get(RelationshipLabel.USER.getName()), FALCON_USER);
+ Assert.assertEquals(vertexProperties.get(RelationshipLabel.USER.getName()), MetadataTestContext.FALCON_USER);
Assert.assertEquals(vertexProperties.get(RelationshipLabel.FEED_CLUSTER_EDGE.getName()),
- CLUSTER_ENTITY_NAME);
+ MetadataTestContext.CLUSTER_ENTITY_NAME);
if (!skipRelationships) {
Assert.assertEquals(vertexProperties.get(RelationshipLabel.GROUPS.getName()), "analytics");
@@ -462,113 +405,6 @@ public class LineageMetadataResourceTest {
}
}
- public void addClusterEntity() throws Exception {
- clusterEntity = EntityBuilderTestUtil.buildCluster(CLUSTER_ENTITY_NAME,
- COLO_NAME, "classification=production");
- configStore.publish(EntityType.CLUSTER, clusterEntity);
- }
-
- public void addFeedEntity() throws Exception {
- Feed impressionsFeed = EntityBuilderTestUtil.buildFeed("impression-feed", clusterEntity,
- "classified-as=Secure", "analytics");
- addStorage(impressionsFeed, Storage.TYPE.FILESYSTEM, "/falcon/impression-feed/${YEAR}${MONTH}${DAY}");
- configStore.publish(EntityType.FEED, impressionsFeed);
- inputFeeds.add(impressionsFeed);
-
- Feed clicksFeed = EntityBuilderTestUtil.buildFeed("clicks-feed", clusterEntity, null, null);
- addStorage(clicksFeed, Storage.TYPE.FILESYSTEM, "/falcon/clicks-feed/${YEAR}${MONTH}${DAY}");
- configStore.publish(EntityType.FEED, clicksFeed);
- inputFeeds.add(clicksFeed);
-
- Feed join1Feed = EntityBuilderTestUtil.buildFeed("imp-click-join1", clusterEntity,
- "classified-as=Financial", "reporting,bi");
- addStorage(join1Feed, Storage.TYPE.FILESYSTEM, "/falcon/imp-click-join1/${YEAR}${MONTH}${DAY}");
- configStore.publish(EntityType.FEED, join1Feed);
- outputFeeds.add(join1Feed);
-
- Feed join2Feed = EntityBuilderTestUtil.buildFeed("imp-click-join2", clusterEntity,
- "classified-as=Secure,classified-as=Financial", "reporting,bi");
- addStorage(join2Feed, Storage.TYPE.FILESYSTEM, "/falcon/imp-click-join2/${YEAR}${MONTH}${DAY}");
- configStore.publish(EntityType.FEED, join2Feed);
- outputFeeds.add(join2Feed);
- }
-
- public static void addStorage(Feed feed, Storage.TYPE storageType, String uriTemplate) {
- if (storageType == Storage.TYPE.FILESYSTEM) {
- Locations locations = new Locations();
- feed.setLocations(locations);
-
- Location location = new Location();
- location.setType(LocationType.DATA);
- location.setPath(uriTemplate);
- feed.getLocations().getLocations().add(location);
- } else {
- CatalogTable table = new CatalogTable();
- table.setUri(uriTemplate);
- feed.setTable(table);
- }
- }
-
- public void addProcessEntity() throws Exception {
- Process processEntity = EntityBuilderTestUtil.buildProcess(PROCESS_ENTITY_NAME,
- clusterEntity, "classified-as=Critical");
- EntityBuilderTestUtil.addProcessWorkflow(processEntity, WORKFLOW_NAME, WORKFLOW_VERSION);
-
- for (Feed inputFeed : inputFeeds) {
- EntityBuilderTestUtil.addInput(processEntity, inputFeed);
- }
-
- for (Feed outputFeed : outputFeeds) {
- EntityBuilderTestUtil.addOutput(processEntity, outputFeed);
- }
-
- configStore.publish(EntityType.PROCESS, processEntity);
- }
-
- public void addInstance() throws Exception {
- WorkflowExecutionContext context = WorkflowExecutionContext.create(getTestMessageArgs(),
- WorkflowExecutionContext.Type.POST_PROCESSING);
- service.onSuccess(context);
- }
-
- private static String[] getTestMessageArgs() {
- return new String[]{
- "-" + WorkflowExecutionArgs.CLUSTER_NAME.getName(), CLUSTER_ENTITY_NAME,
- "-" + WorkflowExecutionArgs.ENTITY_TYPE.getName(), ("process"),
- "-" + WorkflowExecutionArgs.ENTITY_NAME.getName(), PROCESS_ENTITY_NAME,
- "-" + WorkflowExecutionArgs.NOMINAL_TIME.getName(), NOMINAL_TIME,
- "-" + WorkflowExecutionArgs.OPERATION.getName(), OPERATION,
-
- "-" + WorkflowExecutionArgs.INPUT_FEED_NAMES.getName(), INPUT_FEED_NAMES,
- "-" + WorkflowExecutionArgs.INPUT_FEED_PATHS.getName(), INPUT_INSTANCE_PATHS,
-
- "-" + WorkflowExecutionArgs.OUTPUT_FEED_NAMES.getName(), OUTPUT_FEED_NAMES,
- "-" + WorkflowExecutionArgs.OUTPUT_FEED_PATHS.getName(), OUTPUT_INSTANCE_PATHS,
-
- "-" + WorkflowExecutionArgs.WORKFLOW_ID.getName(), "workflow-01-00",
- "-" + WorkflowExecutionArgs.WORKFLOW_USER.getName(), FALCON_USER,
- "-" + WorkflowExecutionArgs.RUN_ID.getName(), "1",
- "-" + WorkflowExecutionArgs.STATUS.getName(), "SUCCEEDED",
- "-" + WorkflowExecutionArgs.TIMESTAMP.getName(), NOMINAL_TIME,
-
- "-" + WorkflowExecutionArgs.WF_ENGINE_URL.getName(), "http://localhost:11000/oozie",
- "-" + WorkflowExecutionArgs.USER_SUBFLOW_ID.getName(), "userflow@wf-id",
- "-" + WorkflowExecutionArgs.USER_WORKFLOW_NAME.getName(), WORKFLOW_NAME,
- "-" + WorkflowExecutionArgs.USER_WORKFLOW_VERSION.getName(), WORKFLOW_VERSION,
- "-" + WorkflowExecutionArgs.USER_WORKFLOW_ENGINE.getName(), EngineType.PIG.name(),
-
-
- "-" + WorkflowExecutionArgs.BRKR_IMPL_CLASS.getName(), "blah",
- "-" + WorkflowExecutionArgs.BRKR_URL.getName(), "tcp://localhost:61616?daemon=true",
- "-" + WorkflowExecutionArgs.USER_BRKR_IMPL_CLASS.getName(), "blah",
- "-" + WorkflowExecutionArgs.USER_BRKR_URL.getName(), "tcp://localhost:61616?daemon=true",
- "-" + WorkflowExecutionArgs.BRKR_TTL.getName(), "1000",
-
- "-" + WorkflowExecutionArgs.LOG_DIR.getName(), LOGS_DIR,
- "-" + WorkflowExecutionArgs.LOG_FILE.getName(), LOGS_DIR + "/log.txt",
- };
- }
-
private long getVerticesCount(final Graph graph) {
long count = 0;
for (Vertex ignored : graph.getVertices()) {
@@ -586,25 +422,4 @@ public class LineageMetadataResourceTest {
return count;
}
-
- private void cleanupGraphStore(Graph graph) {
- for (Edge edge : graph.getEdges()) {
- graph.removeEdge(edge);
- }
-
- for (Vertex vertex : graph.getVertices()) {
- graph.removeVertex(vertex);
- }
-
- graph.shutdown();
- }
-
- private static void cleanupConfigurationStore(ConfigurationStore store) throws Exception {
- for (EntityType type : EntityType.values()) {
- Collection<String> entities = store.getEntities(type);
- for (String entity : entities) {
- store.remove(type, entity);
- }
- }
- }
}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/7f98570b/prism/src/test/java/org/apache/falcon/resource/metadata/MetadataDiscoveryResourceTest.java
----------------------------------------------------------------------
diff --git a/prism/src/test/java/org/apache/falcon/resource/metadata/MetadataDiscoveryResourceTest.java b/prism/src/test/java/org/apache/falcon/resource/metadata/MetadataDiscoveryResourceTest.java
new file mode 100644
index 0000000..0198172
--- /dev/null
+++ b/prism/src/test/java/org/apache/falcon/resource/metadata/MetadataDiscoveryResourceTest.java
@@ -0,0 +1,152 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.resource.metadata;
+
+import org.json.simple.JSONValue;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import javax.ws.rs.core.Response;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Unit tests for org.apache.falcon.resource.metadata.MetadataMappingResource.
+ */
+public class MetadataDiscoveryResourceTest {
+
+ private MetadataTestContext testContext;
+
+ @BeforeClass
+ public void setUp() throws Exception {
+ testContext = new MetadataTestContext();
+ testContext.setUp();
+ }
+
+ @AfterClass
+ public void tearDown() throws Exception {
+ testContext.tearDown();
+ }
+
+ @Test
+ public void testListDimensionsFeed() throws Exception {
+ MetadataDiscoveryResource resource = new MetadataDiscoveryResource();
+ Response response = resource.listDimensionValues("feed_entity", MetadataTestContext.CLUSTER_ENTITY_NAME);
+ Assert.assertEquals(response.getStatus(), Response.Status.OK.getStatusCode());
+ Map results = (Map) JSONValue.parse(response.getEntity().toString());
+ Assert.assertEquals(Integer.parseInt(results.get(MetadataDiscoveryResource.TOTAL_SIZE).toString()), 4);
+ List dimensions = (List) results.get(MetadataDiscoveryResource.RESULTS);
+ Assert.assertTrue(dimensions.contains("impression-feed"));
+
+ response = resource.listDimensionValues("feed_entity", null);
+ results = (Map) JSONValue.parse(response.getEntity().toString());
+ dimensions = (List) results.get(MetadataDiscoveryResource.RESULTS);
+ Assert.assertEquals(Integer.parseInt(results.get(MetadataDiscoveryResource.TOTAL_SIZE).toString()), 4);
+ Assert.assertTrue(dimensions.contains("impression-feed"));
+ }
+
+ @Test
+ public void testListDimensionsProcess() throws Exception {
+ MetadataDiscoveryResource resource = new MetadataDiscoveryResource();
+ Response response = resource.listDimensionValues("process_entity",
+ MetadataTestContext.CLUSTER_ENTITY_NAME);
+ Assert.assertEquals(response.getStatus(), Response.Status.OK.getStatusCode());
+ Map results = (Map) JSONValue.parse(response.getEntity().toString());
+ Assert.assertEquals(Integer.parseInt(results.get(MetadataDiscoveryResource.TOTAL_SIZE).toString()), 1);
+ List dimensions = (List) results.get(MetadataDiscoveryResource.RESULTS);
+ Assert.assertEquals(dimensions.get(0), MetadataTestContext.PROCESS_ENTITY_NAME);
+
+ response = resource.listDimensionValues("process_entity", null);
+ results = (Map) JSONValue.parse(response.getEntity().toString());
+ Assert.assertEquals(Integer.parseInt(results.get(MetadataDiscoveryResource.TOTAL_SIZE).toString()), 1);
+ }
+
+ @Test
+ public void testListDimensionsCluster() throws Exception {
+ MetadataDiscoveryResource resource = new MetadataDiscoveryResource();
+ Response response = resource.listDimensionValues("cluster_entity", MetadataTestContext.CLUSTER_ENTITY_NAME);
+ Assert.assertEquals(response.getStatus(), Response.Status.OK.getStatusCode());
+ Map results = (Map) JSONValue.parse(response.getEntity().toString());
+ Assert.assertEquals(Integer.parseInt(results.get(MetadataDiscoveryResource.TOTAL_SIZE).toString()), 1);
+ List dimensions = (List) results.get(MetadataDiscoveryResource.RESULTS);
+ Assert.assertEquals(dimensions.get(0), MetadataTestContext.CLUSTER_ENTITY_NAME);
+
+ response = resource.listDimensionValues("cluster_entity", null);
+ results = (Map) JSONValue.parse(response.getEntity().toString());
+ dimensions = (List) results.get(MetadataDiscoveryResource.RESULTS);
+ Assert.assertEquals(Integer.parseInt(results.get(MetadataDiscoveryResource.TOTAL_SIZE).toString()), 1);
+ Assert.assertEquals(dimensions.get(0), MetadataTestContext.CLUSTER_ENTITY_NAME);
+ }
+
+ @Test
+ public void testListDimensionsColo() throws Exception {
+ MetadataDiscoveryResource resource = new MetadataDiscoveryResource();
+ Response response = resource.listDimensionValues("colo", MetadataTestContext.CLUSTER_ENTITY_NAME);
+ Assert.assertEquals(response.getStatus(), Response.Status.OK.getStatusCode());
+ Map results = (Map) JSONValue.parse(response.getEntity().toString());
+ Assert.assertEquals(Integer.parseInt(results.get(MetadataDiscoveryResource.TOTAL_SIZE).toString()), 1);
+ List dimensions = (List) results.get(MetadataDiscoveryResource.RESULTS);
+ Assert.assertEquals(dimensions.get(0), MetadataTestContext.COLO_NAME);
+
+ response = resource.listDimensionValues("colo", null);
+ results = (Map) JSONValue.parse(response.getEntity().toString());
+ dimensions = (List) results.get(MetadataDiscoveryResource.RESULTS);
+ Assert.assertEquals(Integer.parseInt(results.get(MetadataDiscoveryResource.TOTAL_SIZE).toString()), 1);
+ Assert.assertEquals(dimensions.get(0), MetadataTestContext.COLO_NAME);
+
+ try {
+ resource.listDimensionValues("INVALID", null);
+ Assert.assertTrue(false);
+ } catch (Exception e) {
+ Assert.assertTrue(true);
+ }
+ }
+
+ @Test
+ public void testListDimensionsPipelines() throws Exception {
+ MetadataDiscoveryResource resource = new MetadataDiscoveryResource();
+ Response response = resource.listDimensionValues("pipelines", null);
+ Assert.assertEquals(response.getStatus(), Response.Status.OK.getStatusCode());
+ Map results = (Map) JSONValue.parse(response.getEntity().toString());
+ List dimensions = (List) results.get(MetadataDiscoveryResource.RESULTS);
+ Assert.assertEquals(Integer.parseInt(results.get(MetadataDiscoveryResource.TOTAL_SIZE).toString()), 1);
+ Assert.assertEquals(dimensions.get(0), "testPipeline");
+
+ response = resource.listDimensionValues("pipelines", MetadataTestContext.CLUSTER_ENTITY_NAME);
+ results = (Map) JSONValue.parse(response.getEntity().toString());
+ Assert.assertEquals(Integer.parseInt(results.get(MetadataDiscoveryResource.TOTAL_SIZE).toString()), 0);
+ }
+
+ @Test
+ public void testListDimensionsTags() throws Exception {
+ MetadataDiscoveryResource resource = new MetadataDiscoveryResource();
+ Response response = resource.listDimensionValues("tags", null);
+ Assert.assertEquals(response.getStatus(), Response.Status.OK.getStatusCode());
+ Map results = (Map) JSONValue.parse(response.getEntity().toString());
+ Assert.assertEquals(Integer.parseInt(results.get(MetadataDiscoveryResource.TOTAL_SIZE).toString()), 4);
+ List dimensions = (List) results.get(MetadataDiscoveryResource.RESULTS);
+ Assert.assertTrue(dimensions.contains("production"));
+
+ response = resource.listDimensionValues("tags", MetadataTestContext.CLUSTER_ENTITY_NAME);
+ results = (Map) JSONValue.parse(response.getEntity().toString());
+ Assert.assertEquals(Integer.parseInt(results.get(MetadataDiscoveryResource.TOTAL_SIZE).toString()), 0);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/7f98570b/prism/src/test/java/org/apache/falcon/resource/metadata/MetadataTestContext.java
----------------------------------------------------------------------
diff --git a/prism/src/test/java/org/apache/falcon/resource/metadata/MetadataTestContext.java b/prism/src/test/java/org/apache/falcon/resource/metadata/MetadataTestContext.java
new file mode 100644
index 0000000..aaddf62
--- /dev/null
+++ b/prism/src/test/java/org/apache/falcon/resource/metadata/MetadataTestContext.java
@@ -0,0 +1,231 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.resource.metadata;
+
+import com.tinkerpop.blueprints.Edge;
+import com.tinkerpop.blueprints.Graph;
+import com.tinkerpop.blueprints.Vertex;
+import org.apache.falcon.cluster.util.EntityBuilderTestUtil;
+import org.apache.falcon.entity.Storage;
+import org.apache.falcon.entity.store.ConfigurationStore;
+import org.apache.falcon.entity.v0.EntityType;
+import org.apache.falcon.entity.v0.cluster.Cluster;
+import org.apache.falcon.entity.v0.feed.*;
+import org.apache.falcon.entity.v0.process.EngineType;
+import org.apache.falcon.metadata.MetadataMappingService;
+import org.apache.falcon.security.CurrentUser;
+import org.apache.falcon.service.Services;
+import org.apache.falcon.util.StartupProperties;
+import org.apache.falcon.workflow.WorkflowExecutionArgs;
+import org.apache.falcon.workflow.WorkflowExecutionContext;
+import org.apache.falcon.workflow.WorkflowJobEndNotificationService;
+import org.testng.Assert;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+/**
+ * Base test class for Metadata Rest API.
+ */
+public class MetadataTestContext {
+ public static final String FALCON_USER = "falcon-user";
+ private static final String LOGS_DIR = "target/log";
+ private static final String NOMINAL_TIME = "2014-01-01-01-00";
+ public static final String OPERATION = "GENERATE";
+
+ public static final String CLUSTER_ENTITY_NAME = "primary-cluster";
+ public static final String PROCESS_ENTITY_NAME = "sample-process";
+ public static final String COLO_NAME = "west-coast";
+ public static final String WORKFLOW_NAME = "imp-click-join-workflow";
+ public static final String WORKFLOW_VERSION = "1.0.9";
+
+ public static final String INPUT_FEED_NAMES = "impression-feed#clicks-feed";
+ public static final String INPUT_INSTANCE_PATHS =
+ "jail://global:00/falcon/impression-feed/20140101#jail://global:00/falcon/clicks-feed/20140101";
+
+ public static final String OUTPUT_FEED_NAMES = "imp-click-join1,imp-click-join2";
+ public static final String OUTPUT_INSTANCE_PATHS =
+ "jail://global:00/falcon/imp-click-join1/20140101,jail://global:00/falcon/imp-click-join2/20140101";
+
+ private ConfigurationStore configStore;
+ private MetadataMappingService service;
+
+ private Cluster clusterEntity;
+ private List<Feed> inputFeeds = new ArrayList<Feed>();
+ private List<Feed> outputFeeds = new ArrayList<Feed>();
+
+ public MetadataTestContext() {}
+
+ public void setUp() throws Exception {
+ CurrentUser.authenticate(FALCON_USER);
+
+ configStore = ConfigurationStore.get();
+
+ Services.get().register(new WorkflowJobEndNotificationService());
+ Assert.assertTrue(Services.get().isRegistered(WorkflowJobEndNotificationService.SERVICE_NAME));
+
+ StartupProperties.get().setProperty("falcon.graph.preserve.history", "true");
+ service = new MetadataMappingService();
+ service.init();
+ Services.get().register(service);
+ Assert.assertTrue(Services.get().isRegistered(MetadataMappingService.SERVICE_NAME));
+
+ addClusterEntity();
+ addFeedEntity();
+ addProcessEntity();
+ addInstance();
+ }
+
+ public MetadataMappingService getService() {
+ return this.service;
+ }
+
+ public void tearDown() throws Exception {
+ cleanupGraphStore(service.getGraph());
+ cleanupConfigurationStore(configStore);
+
+ service.destroy();
+
+ StartupProperties.get().setProperty("falcon.graph.preserve.history", "false");
+ Services.get().reset();
+ }
+
+ public void addClusterEntity() throws Exception {
+ clusterEntity = EntityBuilderTestUtil.buildCluster(CLUSTER_ENTITY_NAME,
+ COLO_NAME, "classification=production");
+ configStore.publish(EntityType.CLUSTER, clusterEntity);
+ }
+
+ public void addFeedEntity() throws Exception {
+ Feed impressionsFeed = EntityBuilderTestUtil.buildFeed("impression-feed", clusterEntity,
+ "classified-as=Secure", "analytics");
+ addStorage(impressionsFeed, Storage.TYPE.FILESYSTEM, "/falcon/impression-feed/${YEAR}${MONTH}${DAY}");
+ configStore.publish(EntityType.FEED, impressionsFeed);
+ inputFeeds.add(impressionsFeed);
+
+ Feed clicksFeed = EntityBuilderTestUtil.buildFeed("clicks-feed", clusterEntity, null, null);
+ addStorage(clicksFeed, Storage.TYPE.FILESYSTEM, "/falcon/clicks-feed/${YEAR}${MONTH}${DAY}");
+ configStore.publish(EntityType.FEED, clicksFeed);
+ inputFeeds.add(clicksFeed);
+
+ Feed join1Feed = EntityBuilderTestUtil.buildFeed("imp-click-join1", clusterEntity,
+ "classified-as=Financial", "reporting,bi");
+ addStorage(join1Feed, Storage.TYPE.FILESYSTEM, "/falcon/imp-click-join1/${YEAR}${MONTH}${DAY}");
+ configStore.publish(EntityType.FEED, join1Feed);
+ outputFeeds.add(join1Feed);
+
+ Feed join2Feed = EntityBuilderTestUtil.buildFeed("imp-click-join2", clusterEntity,
+ "classified-as=Secure,classified-as=Financial", "reporting,bi");
+ addStorage(join2Feed, Storage.TYPE.FILESYSTEM, "/falcon/imp-click-join2/${YEAR}${MONTH}${DAY}");
+ configStore.publish(EntityType.FEED, join2Feed);
+ outputFeeds.add(join2Feed);
+ }
+
+ public static void addStorage(Feed feed, Storage.TYPE storageType, String uriTemplate) {
+ if (storageType == Storage.TYPE.FILESYSTEM) {
+ Locations locations = new Locations();
+ feed.setLocations(locations);
+
+ Location location = new Location();
+ location.setType(LocationType.DATA);
+ location.setPath(uriTemplate);
+ feed.getLocations().getLocations().add(location);
+ } else {
+ CatalogTable table = new CatalogTable();
+ table.setUri(uriTemplate);
+ feed.setTable(table);
+ }
+ }
+
+ public void addProcessEntity() throws Exception {
+ org.apache.falcon.entity.v0.process.Process processEntity =
+ EntityBuilderTestUtil.buildProcess(PROCESS_ENTITY_NAME,
+ clusterEntity, "classified-as=Critical", "testPipeline");
+ EntityBuilderTestUtil.addProcessWorkflow(processEntity, WORKFLOW_NAME, WORKFLOW_VERSION);
+
+ for (Feed inputFeed : inputFeeds) {
+ EntityBuilderTestUtil.addInput(processEntity, inputFeed);
+ }
+
+ for (Feed outputFeed : outputFeeds) {
+ EntityBuilderTestUtil.addOutput(processEntity, outputFeed);
+ }
+
+ configStore.publish(EntityType.PROCESS, processEntity);
+ }
+
+ public void addInstance() throws Exception {
+ WorkflowExecutionContext context = WorkflowExecutionContext.create(getTestMessageArgs(),
+ WorkflowExecutionContext.Type.POST_PROCESSING);
+ service.onSuccess(context);
+ }
+
+ private void cleanupGraphStore(Graph graph) {
+ for (Edge edge : graph.getEdges()) {
+ graph.removeEdge(edge);
+ }
+
+ for (Vertex vertex : graph.getVertices()) {
+ graph.removeVertex(vertex);
+ }
+
+ graph.shutdown();
+ }
+
+ private static void cleanupConfigurationStore(ConfigurationStore store) throws Exception {
+ for (EntityType type : EntityType.values()) {
+ Collection<String> entities = store.getEntities(type);
+ for (String entity : entities) {
+ store.remove(type, entity);
+ }
+ }
+ }
+
+ private static String[] getTestMessageArgs() {
+ return new String[]{
+ "-" + WorkflowExecutionArgs.CLUSTER_NAME.getName(), CLUSTER_ENTITY_NAME,
+ "-" + WorkflowExecutionArgs.ENTITY_TYPE.getName(), ("process"),
+ "-" + WorkflowExecutionArgs.ENTITY_NAME.getName(), PROCESS_ENTITY_NAME,
+ "-" + WorkflowExecutionArgs.NOMINAL_TIME.getName(), NOMINAL_TIME,
+ "-" + WorkflowExecutionArgs.OPERATION.getName(), OPERATION,
+ "-" + WorkflowExecutionArgs.INPUT_FEED_NAMES.getName(), INPUT_FEED_NAMES,
+ "-" + WorkflowExecutionArgs.INPUT_FEED_PATHS.getName(), INPUT_INSTANCE_PATHS,
+ "-" + WorkflowExecutionArgs.OUTPUT_FEED_NAMES.getName(), OUTPUT_FEED_NAMES,
+ "-" + WorkflowExecutionArgs.OUTPUT_FEED_PATHS.getName(), OUTPUT_INSTANCE_PATHS,
+ "-" + WorkflowExecutionArgs.WORKFLOW_ID.getName(), "workflow-01-00",
+ "-" + WorkflowExecutionArgs.WORKFLOW_USER.getName(), FALCON_USER,
+ "-" + WorkflowExecutionArgs.RUN_ID.getName(), "1",
+ "-" + WorkflowExecutionArgs.STATUS.getName(), "SUCCEEDED",
+ "-" + WorkflowExecutionArgs.TIMESTAMP.getName(), NOMINAL_TIME,
+ "-" + WorkflowExecutionArgs.WF_ENGINE_URL.getName(), "http://localhost:11000/oozie",
+ "-" + WorkflowExecutionArgs.USER_SUBFLOW_ID.getName(), "userflow@wf-id",
+ "-" + WorkflowExecutionArgs.USER_WORKFLOW_NAME.getName(), WORKFLOW_NAME,
+ "-" + WorkflowExecutionArgs.USER_WORKFLOW_VERSION.getName(), WORKFLOW_VERSION,
+ "-" + WorkflowExecutionArgs.USER_WORKFLOW_ENGINE.getName(), EngineType.PIG.name(),
+ "-" + WorkflowExecutionArgs.BRKR_IMPL_CLASS.getName(), "blah",
+ "-" + WorkflowExecutionArgs.BRKR_URL.getName(), "tcp://localhost:61616?daemon=true",
+ "-" + WorkflowExecutionArgs.USER_BRKR_IMPL_CLASS.getName(), "blah",
+ "-" + WorkflowExecutionArgs.USER_BRKR_URL.getName(), "tcp://localhost:61616?daemon=true",
+ "-" + WorkflowExecutionArgs.BRKR_TTL.getName(), "1000",
+ "-" + WorkflowExecutionArgs.LOG_DIR.getName(), LOGS_DIR,
+ "-" + WorkflowExecutionArgs.LOG_FILE.getName(), LOGS_DIR + "/log.txt",
+ };
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/7f98570b/webapp/src/test/java/org/apache/falcon/cli/FalconCLIIT.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/falcon/cli/FalconCLIIT.java b/webapp/src/test/java/org/apache/falcon/cli/FalconCLIIT.java
index caee676..31855bc 100644
--- a/webapp/src/test/java/org/apache/falcon/cli/FalconCLIIT.java
+++ b/webapp/src/test/java/org/apache/falcon/cli/FalconCLIIT.java
@@ -20,6 +20,7 @@ package org.apache.falcon.cli;
import org.apache.commons.io.FilenameUtils;
import org.apache.falcon.entity.v0.SchemaHelper;
+import org.apache.falcon.metadata.RelationshipType;
import org.apache.falcon.resource.TestContext;
import org.apache.falcon.util.OozieTestUtils;
import org.testng.Assert;
@@ -708,6 +709,42 @@ public class FalconCLIIT {
executeWithURL("instance -running -type process -name " + overlay.get("processName")));
}
+ @Test
+ public void testMetadataListCommands() throws Exception {
+ TestContext context = new TestContext();
+ Map<String, String> overlay = context.getUniqueOverlay();
+ submitTestFiles(context, overlay);
+
+ String processName = overlay.get("processName");
+ String feedName = overlay.get("outputFeedName");
+ String clusterName = overlay.get("cluster");
+
+ Assert.assertEquals(0,
+ executeWithURL(FalconCLI.ENTITY_CMD + " -" + FalconCLI.SCHEDULE_OPT + " -"
+ + FalconCLI.ENTITY_TYPE_OPT + " process -" + FalconCLI.ENTITY_NAME_OPT + " " + processName));
+
+ Assert.assertEquals(0,
+ executeWithURL(FalconCLI.ENTITY_CMD + " -" + FalconCLI.SCHEDULE_OPT + " -"
+ + FalconCLI.ENTITY_TYPE_OPT + " feed -" + FalconCLI.ENTITY_NAME_OPT + " " + feedName));
+
+ OozieTestUtils.waitForProcessWFtoStart(context);
+
+ String metadataListCommand = FalconCLI.METADATA_CMD + " -" + FalconMetadataCLI.LIST_OPT + " -"
+ + FalconMetadataCLI.TYPE_OPT + " ";
+ String clusterString = " -" + FalconMetadataCLI.CLUSTER_OPT + " " + clusterName;
+ Assert.assertEquals(0, executeWithURL(metadataListCommand + RelationshipType.CLUSTER_ENTITY.toString()));
+ Assert.assertEquals(0, executeWithURL(metadataListCommand + RelationshipType.PROCESS_ENTITY.toString()));
+ Assert.assertEquals(0, executeWithURL(metadataListCommand + RelationshipType.FEED_ENTITY.toString()));
+ Assert.assertEquals(0, executeWithURL(metadataListCommand + RelationshipType.PROCESS_ENTITY.toString()
+ + clusterString));
+ Assert.assertEquals(0, executeWithURL(metadataListCommand + RelationshipType.FEED_ENTITY.toString()
+ + clusterString));
+ Assert.assertEquals(0, executeWithURL(metadataListCommand + RelationshipType.CLUSTER_ENTITY.toString()
+ + clusterString));
+
+ Assert.assertEquals(-1, executeWithURL(metadataListCommand + "feed"));
+ Assert.assertEquals(-1, executeWithURL(metadataListCommand + "invalid"));
+ }
public void testContinue() throws Exception {
TestContext context = new TestContext();
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/7f98570b/webapp/src/test/java/org/apache/falcon/process/PigProcessIT.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/falcon/process/PigProcessIT.java b/webapp/src/test/java/org/apache/falcon/process/PigProcessIT.java
index 89132d6..0f2a971 100644
--- a/webapp/src/test/java/org/apache/falcon/process/PigProcessIT.java
+++ b/webapp/src/test/java/org/apache/falcon/process/PigProcessIT.java
@@ -54,7 +54,7 @@ public class PigProcessIT {
@BeforeClass
public void prepare() throws Exception {
- TestContext.prepare(CLUSTER_TEMPLATE);
+ TestContext.prepare(CLUSTER_TEMPLATE, true);
overlay = context.getUniqueOverlay();
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/7f98570b/webapp/src/test/java/org/apache/falcon/process/TableStorageProcessIT.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/falcon/process/TableStorageProcessIT.java b/webapp/src/test/java/org/apache/falcon/process/TableStorageProcessIT.java
index 91662d4..51afbb8 100644
--- a/webapp/src/test/java/org/apache/falcon/process/TableStorageProcessIT.java
+++ b/webapp/src/test/java/org/apache/falcon/process/TableStorageProcessIT.java
@@ -66,7 +66,7 @@ public class TableStorageProcessIT {
@BeforeClass
public void prepare() throws Exception {
- TestContext.prepare(CLUSTER_TEMPLATE);
+ TestContext.prepare(CLUSTER_TEMPLATE, true);
overlay = context.getUniqueOverlay();
String filePath = TestContext.overlayParametersOverTemplate(CLUSTER_TEMPLATE, overlay);
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/7f98570b/webapp/src/test/java/org/apache/falcon/resource/MetadataResourceJerseyIT.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/falcon/resource/MetadataResourceJerseyIT.java b/webapp/src/test/java/org/apache/falcon/resource/MetadataResourceJerseyIT.java
new file mode 100644
index 0000000..462bcdc
--- /dev/null
+++ b/webapp/src/test/java/org/apache/falcon/resource/MetadataResourceJerseyIT.java
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.resource;
+
+import com.sun.jersey.api.client.ClientResponse;
+import org.apache.falcon.entity.v0.EntityType;
+import org.apache.falcon.resource.metadata.AbstractMetadataResource;
+import org.json.simple.JSONValue;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Test class for Metadata REST APIs.
+ *
+ * Tests should be enabled only in local environments as they need running instance of the web server.
+ */
+public class MetadataResourceJerseyIT {
+
+ private TestContext context;
+
+ @BeforeClass
+ public void prepare() throws Exception {
+ context = newContext();
+ ClientResponse response;
+ Map<String, String> overlay = context.getUniqueOverlay();
+
+ response = context.submitToFalcon(TestContext.CLUSTER_TEMPLATE, overlay, EntityType.CLUSTER);
+ context.assertSuccessful(response);
+
+ response = context.submitToFalcon(TestContext.FEED_TEMPLATE1, overlay, EntityType.FEED);
+ context.assertSuccessful(response);
+
+ response = context.submitToFalcon(TestContext.FEED_TEMPLATE2, overlay, EntityType.FEED);
+ context.assertSuccessful(response);
+
+ response = context.submitToFalcon(TestContext.PROCESS_TEMPLATE, overlay, EntityType.PROCESS);
+ context.assertSuccessful(response);
+ }
+
+ @Test
+ public void testMetadataList() throws Exception {
+
+ ClientResponse response = context.service
+ .path("api/metadata/cluster_entity/list")
+ .header("Cookie", context.getAuthenticationToken())
+ .accept(MediaType.APPLICATION_JSON)
+ .get(ClientResponse.class);
+ Assert.assertEquals(response.getStatus(), Response.Status.OK.getStatusCode());
+ Map results = (Map) JSONValue.parse(response.getEntity(String.class));
+ List dimensions = (List) results.get(AbstractMetadataResource.RESULTS);
+ Assert.assertTrue(dimensions.contains(context.clusterName));
+
+ response = context.service
+ .path("api/metadata/process_entity/list")
+ .queryParam("cluster", context.clusterName)
+ .header("Cookie", context.getAuthenticationToken())
+ .accept(MediaType.APPLICATION_JSON)
+ .get(ClientResponse.class);
+ Assert.assertEquals(response.getStatus(), Response.Status.OK.getStatusCode());
+ results = (Map) JSONValue.parse(response.getEntity(String.class));
+ dimensions = (List) results.get(AbstractMetadataResource.RESULTS);
+ Assert.assertTrue(dimensions.contains(context.processName));
+
+ response = context.service
+ .path("api/metadata/process_entity/list")
+ .queryParam("cluster", "random")
+ .header("Cookie", context.getAuthenticationToken())
+ .accept(MediaType.APPLICATION_JSON)
+ .get(ClientResponse.class);
+ Assert.assertEquals(response.getStatus(), Response.Status.OK.getStatusCode());
+ results = (Map) JSONValue.parse(response.getEntity(String.class));
+ Assert.assertEquals(((Long)results.get(AbstractMetadataResource.TOTAL_SIZE)).longValue(), 0);
+ }
+
+ private ThreadLocal<TestContext> contexts = new ThreadLocal<TestContext>();
+
+ private TestContext newContext() throws Exception {
+ TestContext.prepare(TestContext.CLUSTER_TEMPLATE, false);
+ contexts.set(new TestContext());
+ return contexts.get();
+ }
+}