You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@atlas.apache.org by sh...@apache.org on 2016/05/18 12:25:40 UTC
[2/2] incubator-atlas git commit: ATLAS-713 Entity lineage based on
entity id (shwethags)
ATLAS-713 Entity lineage based on entity id (shwethags)
Project: http://git-wip-us.apache.org/repos/asf/incubator-atlas/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-atlas/commit/b65dd91c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-atlas/tree/b65dd91c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-atlas/diff/b65dd91c
Branch: refs/heads/master
Commit: b65dd91c3587d35abafc4ec136e162f9a5c92ac1
Parents: 857561a
Author: Shwetha GS <ss...@hortonworks.com>
Authored: Wed May 18 17:55:24 2016 +0530
Committer: Shwetha GS <ss...@hortonworks.com>
Committed: Wed May 18 17:55:24 2016 +0530
----------------------------------------------------------------------
.../main/java/org/apache/atlas/AtlasClient.java | 41 +-
dashboardv2/public/js/models/VLineage.js | 4 +-
dashboardv2/public/js/models/VSchema.js | 4 +-
.../views/detail_page/DetailPageLayoutView.js | 8 +-
.../public/js/views/graph/LineageLayoutView.js | 4 +-
.../public/js/views/schema/SchemaLayoutView.js | 2 +-
distro/src/conf/atlas-application.properties | 10 +-
release-log.txt | 1 +
.../apache/atlas/RepositoryMetadataModule.java | 4 +-
.../atlas/discovery/DataSetLineageService.java | 215 +++++++++
.../atlas/discovery/HiveLineageService.java | 222 ---------
.../org/apache/atlas/query/ClosureQuery.scala | 44 +-
.../apache/atlas/BaseHiveRepositoryTest.java | 377 ----------------
.../org/apache/atlas/BaseRepositoryTest.java | 377 ++++++++++++++++
.../discovery/DataSetLineageServiceTest.java | 447 +++++++++++++++++++
.../GraphBackedDiscoveryServiceTest.java | 4 +-
.../atlas/discovery/HiveLineageServiceTest.java | 260 -----------
.../org/apache/atlas/query/GremlinTest2.scala | 8 +-
.../apache/atlas/discovery/LineageService.java | 44 +-
.../main/resources/atlas-application.properties | 8 +-
.../web/resources/DataSetLineageResource.java | 162 +++++++
.../web/resources/HiveLineageResource.java | 166 -------
.../atlas/web/resources/LineageResource.java | 153 +++++++
.../DataSetLineageJerseyResourceIT.java | 306 +++++++++++++
.../resources/HiveLineageJerseyResourceIT.java | 257 -----------
25 files changed, 1768 insertions(+), 1360 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/b65dd91c/client/src/main/java/org/apache/atlas/AtlasClient.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/atlas/AtlasClient.java b/client/src/main/java/org/apache/atlas/AtlasClient.java
index b3ec95c..7e32cc2 100755
--- a/client/src/main/java/org/apache/atlas/AtlasClient.java
+++ b/client/src/main/java/org/apache/atlas/AtlasClient.java
@@ -90,7 +90,8 @@ public class AtlasClient {
public static final String URI_ENTITY = "entities";
public static final String URI_ENTITY_AUDIT = "audit";
public static final String URI_SEARCH = "discovery/search";
- public static final String URI_LINEAGE = "lineage/hive/table";
+ public static final String URI_NAME_LINEAGE = "lineage/hive/table";
+ public static final String URI_LINEAGE = "lineage/";
public static final String URI_TRAITS = "traits";
public static final String QUERY = "query";
@@ -416,7 +417,12 @@ public class AtlasClient {
SEARCH_GREMLIN(BASE_URI + URI_SEARCH + "/gremlin", HttpMethod.GET, Response.Status.OK),
SEARCH_FULL_TEXT(BASE_URI + URI_SEARCH + "/fulltext", HttpMethod.GET, Response.Status.OK),
- //Lineage operations
+ //Lineage operations based on dataset name
+ NAME_LINEAGE_INPUTS_GRAPH(BASE_URI + URI_NAME_LINEAGE, HttpMethod.GET, Response.Status.OK),
+ NAME_LINEAGE_OUTPUTS_GRAPH(BASE_URI + URI_NAME_LINEAGE, HttpMethod.GET, Response.Status.OK),
+ NAME_LINEAGE_SCHEMA(BASE_URI + URI_NAME_LINEAGE, HttpMethod.GET, Response.Status.OK),
+
+ //Lineage operations based on entity id of the dataset
LINEAGE_INPUTS_GRAPH(BASE_URI + URI_LINEAGE, HttpMethod.GET, Response.Status.OK),
LINEAGE_OUTPUTS_GRAPH(BASE_URI + URI_LINEAGE, HttpMethod.GET, Response.Status.OK),
LINEAGE_SCHEMA(BASE_URI + URI_LINEAGE, HttpMethod.GET, Response.Status.OK);
@@ -988,7 +994,7 @@ public class AtlasClient {
}
public JSONObject getInputGraph(String datasetName) throws AtlasServiceException {
- JSONObject response = callAPI(API.LINEAGE_INPUTS_GRAPH, null, datasetName, "/inputs/graph");
+ JSONObject response = callAPI(API.NAME_LINEAGE_INPUTS_GRAPH, null, datasetName, "/inputs/graph");
try {
return response.getJSONObject(AtlasClient.RESULTS);
} catch (JSONException e) {
@@ -997,7 +1003,34 @@ public class AtlasClient {
}
public JSONObject getOutputGraph(String datasetName) throws AtlasServiceException {
- JSONObject response = callAPI(API.LINEAGE_OUTPUTS_GRAPH, null, datasetName, "/outputs/graph");
+ JSONObject response = callAPI(API.NAME_LINEAGE_OUTPUTS_GRAPH, null, datasetName, "/outputs/graph");
+ try {
+ return response.getJSONObject(AtlasClient.RESULTS);
+ } catch (JSONException e) {
+ throw new AtlasServiceException(e);
+ }
+ }
+
+ public JSONObject getInputGraphForEntity(String entityId) throws AtlasServiceException {
+ JSONObject response = callAPI(API.LINEAGE_INPUTS_GRAPH, null, entityId, "/inputs/graph");
+ try {
+ return response.getJSONObject(AtlasClient.RESULTS);
+ } catch (JSONException e) {
+ throw new AtlasServiceException(e);
+ }
+ }
+
+ public JSONObject getOutputGraphForEntity(String datasetId) throws AtlasServiceException {
+ JSONObject response = callAPI(API.LINEAGE_OUTPUTS_GRAPH, null, datasetId, "/outputs/graph");
+ try {
+ return response.getJSONObject(AtlasClient.RESULTS);
+ } catch (JSONException e) {
+ throw new AtlasServiceException(e);
+ }
+ }
+
+ public JSONObject getSchemaForEntity(String datasetId) throws AtlasServiceException {
+ JSONObject response = callAPI(API.LINEAGE_OUTPUTS_GRAPH, null, datasetId, "/schema");
try {
return response.getJSONObject(AtlasClient.RESULTS);
} catch (JSONException e) {
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/b65dd91c/dashboardv2/public/js/models/VLineage.js
----------------------------------------------------------------------
diff --git a/dashboardv2/public/js/models/VLineage.js b/dashboardv2/public/js/models/VLineage.js
index e33488a..fa1be05 100644
--- a/dashboardv2/public/js/models/VLineage.js
+++ b/dashboardv2/public/js/models/VLineage.js
@@ -23,7 +23,7 @@ define(['require',
'use strict';
var VLineage = VBaseModel.extend({
- urlRoot: Globals.baseURL + 'api/atlas/lineage/hive/table/assetName/outputs/graph',
+ urlRoot: Globals.baseURL + 'api/atlas/lineage/assetName/outputs/graph',
defaults: {},
@@ -36,7 +36,7 @@ define(['require',
this.bindErrorEvents();
},
toString: function() {
- return this.get('name');
+ return this.get('id');
},
}, {});
return VLineage;
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/b65dd91c/dashboardv2/public/js/models/VSchema.js
----------------------------------------------------------------------
diff --git a/dashboardv2/public/js/models/VSchema.js b/dashboardv2/public/js/models/VSchema.js
index 1f8e0bb..24462e6 100644
--- a/dashboardv2/public/js/models/VSchema.js
+++ b/dashboardv2/public/js/models/VSchema.js
@@ -22,7 +22,7 @@ define(['require',
], function(require, Globals, VBaseModel) {
'use strict';
var VSchema = VBaseModel.extend({
- urlRoot: Globals.baseURL + '/api/atlas/lineage/hive/table/log_fact_daily_mv/schema',
+ urlRoot: Globals.baseURL + '/api/atlas/lineage/log_fact_daily_mv/schema',
defaults: {},
@@ -35,7 +35,7 @@ define(['require',
this.bindErrorEvents();
},
toString: function() {
- return this.get('name');
+ return this.get('id');
},
}, {});
return VSchema;
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/b65dd91c/dashboardv2/public/js/views/detail_page/DetailPageLayoutView.js
----------------------------------------------------------------------
diff --git a/dashboardv2/public/js/views/detail_page/DetailPageLayoutView.js b/dashboardv2/public/js/views/detail_page/DetailPageLayoutView.js
index 87adec0..0932208 100644
--- a/dashboardv2/public/js/views/detail_page/DetailPageLayoutView.js
+++ b/dashboardv2/public/js/views/detail_page/DetailPageLayoutView.js
@@ -92,7 +92,7 @@ define(['require',
this.renderEntityDetailTableLayoutView();
this.renderTagTableLayoutView(tagGuid);
this.renderLineageLayoutView(tagGuid);
- this.renderSchemaLayoutView();
+ this.renderSchemaLayoutView(tagGuid);
}, this);
},
onRender: function() {},
@@ -120,17 +120,17 @@ define(['require',
require(['views/graph/LineageLayoutView'], function(LineageLayoutView) {
that.RLineageLayoutView.show(new LineageLayoutView({
globalVent: that.globalVent,
- assetName: that.name,
+ assetName: tagGuid,
guid: tagGuid
}));
});
},
- renderSchemaLayoutView: function() {
+ renderSchemaLayoutView: function(tagGuid) {
var that = this;
require(['views/schema/SchemaLayoutView'], function(SchemaLayoutView) {
that.RSchemaTableLayoutView.show(new SchemaLayoutView({
globalVent: that.globalVent,
- name: that.name,
+ name: tagGuid,
vent: that.vent
}));
});
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/b65dd91c/dashboardv2/public/js/views/graph/LineageLayoutView.js
----------------------------------------------------------------------
diff --git a/dashboardv2/public/js/views/graph/LineageLayoutView.js b/dashboardv2/public/js/views/graph/LineageLayoutView.js
index 973d091..31433c1 100644
--- a/dashboardv2/public/js/views/graph/LineageLayoutView.js
+++ b/dashboardv2/public/js/views/graph/LineageLayoutView.js
@@ -56,8 +56,8 @@ define(['require',
this.inputCollection = new VLineageList();
this.outputCollection = new VLineageList();
this.entityModel = new VEntity();
- this.inputCollection.url = "/api/atlas/lineage/hive/table/" + this.assetName + "/inputs/graph";
- this.outputCollection.url = "/api/atlas/lineage/hive/table/" + this.assetName + "/outputs/graph";
+ this.inputCollection.url = "/api/atlas/lineage/" + this.assetName + "/inputs/graph";
+ this.outputCollection.url = "/api/atlas/lineage/" + this.assetName + "/outputs/graph";
this.bindEvents();
this.fetchGraphData();
this.data = {};
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/b65dd91c/dashboardv2/public/js/views/schema/SchemaLayoutView.js
----------------------------------------------------------------------
diff --git a/dashboardv2/public/js/views/schema/SchemaLayoutView.js b/dashboardv2/public/js/views/schema/SchemaLayoutView.js
index de558a7..301b993 100644
--- a/dashboardv2/public/js/views/schema/SchemaLayoutView.js
+++ b/dashboardv2/public/js/views/schema/SchemaLayoutView.js
@@ -73,7 +73,7 @@ define(['require',
initialize: function(options) {
_.extend(this, _.pick(options, 'globalVent', 'name', 'vent'));
this.schemaCollection = new VSchemaList([], {});
- this.schemaCollection.url = "/api/atlas/lineage/hive/table/" + this.name + "/schema";
+ this.schemaCollection.url = "/api/atlas/lineage/" + this.name + "/schema";
this.commonTableOptions = {
collection: this.schemaCollection,
includeFilter: false,
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/b65dd91c/distro/src/conf/atlas-application.properties
----------------------------------------------------------------------
diff --git a/distro/src/conf/atlas-application.properties b/distro/src/conf/atlas-application.properties
index 68a0021..d4722fb 100755
--- a/distro/src/conf/atlas-application.properties
+++ b/distro/src/conf/atlas-application.properties
@@ -63,15 +63,9 @@ atlas.kafka.auto.commit.enable=false
######### Hive Lineage Configs #########
-# This models reflects the base super types for Data and Process
-#atlas.lineage.hive.table.type.name=DataSet
-#atlas.lineage.hive.process.type.name=Process
-#atlas.lineage.hive.process.inputs.name=inputs
-#atlas.lineage.hive.process.outputs.name=outputs
-
## Schema
-atlas.lineage.hive.table.schema.query.hive_table=hive_table where name='%s'\, columns
-atlas.lineage.hive.table.schema.query.Table=Table where name='%s'\, columns
+atlas.lineage.schema.query.hive_table=hive_table where __guid='%s'\, columns
+atlas.lineage.schema.query.Table=Table where __guid='%s'\, columns
## Server port configuration
#atlas.server.http.port=21000
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/b65dd91c/release-log.txt
----------------------------------------------------------------------
diff --git a/release-log.txt b/release-log.txt
index b600fff..a68010a 100644
--- a/release-log.txt
+++ b/release-log.txt
@@ -21,6 +21,7 @@ ATLAS-409 Atlas will not import avro tables with schema read from a file (dosset
ATLAS-379 Create sqoop and falcon metadata addons (venkatnrangan,bvellanki,sowmyaramesh via shwethags)
ALL CHANGES:
+ATLAS-713 Entity lineage based on entity id (shwethags)
ATLAS-736 UI - BUG :: displaying timestamp values for hive_db description (kevalbhatt18 via yhemanth)
ATLAS-784 Configure config.store.uri for Falcon hook IT (yhemanth)
ATLAS-645 FieldMapping.output() results in stack overflow when instances reference each other (dkantor via shwethags)
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/b65dd91c/repository/src/main/java/org/apache/atlas/RepositoryMetadataModule.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/org/apache/atlas/RepositoryMetadataModule.java b/repository/src/main/java/org/apache/atlas/RepositoryMetadataModule.java
index 8dae968..68b707f 100755
--- a/repository/src/main/java/org/apache/atlas/RepositoryMetadataModule.java
+++ b/repository/src/main/java/org/apache/atlas/RepositoryMetadataModule.java
@@ -26,7 +26,7 @@ import com.google.inject.throwingproviders.ThrowingProviderBinder;
import com.thinkaurelius.titan.core.TitanGraph;
import org.aopalliance.intercept.MethodInterceptor;
import org.apache.atlas.discovery.DiscoveryService;
-import org.apache.atlas.discovery.HiveLineageService;
+import org.apache.atlas.discovery.DataSetLineageService;
import org.apache.atlas.discovery.LineageService;
import org.apache.atlas.discovery.graph.GraphBackedDiscoveryService;
import org.apache.atlas.listener.EntityChangeListener;
@@ -83,7 +83,7 @@ public class RepositoryMetadataModule extends com.google.inject.AbstractModule {
// bind the DiscoveryService interface to an implementation
bind(DiscoveryService.class).to(GraphBackedDiscoveryService.class).asEagerSingleton();
- bind(LineageService.class).to(HiveLineageService.class).asEagerSingleton();
+ bind(LineageService.class).to(DataSetLineageService.class).asEagerSingleton();
bindAuditRepository(binder());
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/b65dd91c/repository/src/main/java/org/apache/atlas/discovery/DataSetLineageService.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/org/apache/atlas/discovery/DataSetLineageService.java b/repository/src/main/java/org/apache/atlas/discovery/DataSetLineageService.java
new file mode 100644
index 0000000..39dde2a
--- /dev/null
+++ b/repository/src/main/java/org/apache/atlas/discovery/DataSetLineageService.java
@@ -0,0 +1,215 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.discovery;
+
+import com.thinkaurelius.titan.core.TitanGraph;
+import org.apache.atlas.ApplicationProperties;
+import org.apache.atlas.AtlasClient;
+import org.apache.atlas.AtlasException;
+import org.apache.atlas.GraphTransaction;
+import org.apache.atlas.discovery.graph.DefaultGraphPersistenceStrategy;
+import org.apache.atlas.discovery.graph.GraphBackedDiscoveryService;
+import org.apache.atlas.query.GremlinQueryResult;
+import org.apache.atlas.query.InputLineageClosureQuery;
+import org.apache.atlas.query.OutputLineageClosureQuery;
+import org.apache.atlas.repository.MetadataRepository;
+import org.apache.atlas.repository.graph.GraphProvider;
+import org.apache.atlas.typesystem.exception.EntityNotFoundException;
+import org.apache.atlas.typesystem.persistence.ReferenceableInstance;
+import org.apache.atlas.utils.ParamChecker;
+import org.apache.commons.configuration.Configuration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Option;
+import scala.Some;
+import scala.collection.immutable.List;
+
+import javax.inject.Inject;
+import javax.inject.Singleton;
+
+/**
+ * Hive implementation of Lineage service interface.
+ */
+@Singleton
+public class DataSetLineageService implements LineageService {
+
+ private static final Logger LOG = LoggerFactory.getLogger(DataSetLineageService.class);
+
+ private static final Option<List<String>> SELECT_ATTRIBUTES =
+ Some.<List<String>>apply(List.<String>fromArray(new String[]{"name"}));
+ public static final String SELECT_INSTANCE_GUID = "__guid";
+
+ public static final String DATASET_SCHEMA_QUERY_PREFIX = "atlas.lineage.schema.query.";
+
+ private static final String HIVE_PROCESS_TYPE_NAME = "Process";
+ private static final String HIVE_PROCESS_INPUT_ATTRIBUTE_NAME = "inputs";
+ private static final String HIVE_PROCESS_OUTPUT_ATTRIBUTE_NAME = "outputs";
+
+ private static final String DATASET_EXISTS_QUERY = AtlasClient.DATA_SET_SUPER_TYPE + " where __guid = '%s'";
+ private static final String DATASET_NAME_EXISTS_QUERY =
+ AtlasClient.DATA_SET_SUPER_TYPE + " where name = '%s' and __state = 'ACTIVE'";
+
+ private static final Configuration propertiesConf;
+
+ static {
+ try {
+ propertiesConf = ApplicationProperties.get();
+ } catch (AtlasException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+
+ private final TitanGraph titanGraph;
+ private final DefaultGraphPersistenceStrategy graphPersistenceStrategy;
+ private final GraphBackedDiscoveryService discoveryService;
+
+ @Inject
+ DataSetLineageService(GraphProvider<TitanGraph> graphProvider, MetadataRepository metadataRepository,
+ GraphBackedDiscoveryService discoveryService) throws DiscoveryException {
+ this.titanGraph = graphProvider.get();
+ this.graphPersistenceStrategy = new DefaultGraphPersistenceStrategy(metadataRepository);
+ this.discoveryService = discoveryService;
+ }
+
+ /**
+ * Return the lineage outputs graph for the given datasetName.
+ *
+ * @param datasetName datasetName
+ * @return Outputs Graph as JSON
+ */
+ @Override
+ @GraphTransaction
+ public String getOutputsGraph(String datasetName) throws AtlasException {
+ LOG.info("Fetching lineage outputs graph for datasetName={}", datasetName);
+ ParamChecker.notEmpty(datasetName, "dataset name");
+ ReferenceableInstance datasetInstance = validateDatasetNameExists(datasetName);
+ return getOutputsGraphForId(datasetInstance.getId()._getId());
+ }
+
+ /**
+ * Return the lineage inputs graph for the given tableName.
+ *
+ * @param tableName tableName
+ * @return Inputs Graph as JSON
+ */
+ @Override
+ @GraphTransaction
+ public String getInputsGraph(String tableName) throws AtlasException {
+ LOG.info("Fetching lineage inputs graph for tableName={}", tableName);
+ ParamChecker.notEmpty(tableName, "table name");
+ ReferenceableInstance datasetInstance = validateDatasetNameExists(tableName);
+ return getInputsGraphForId(datasetInstance.getId()._getId());
+ }
+
+ @Override
+ public String getInputsGraphForEntity(String guid) throws AtlasException {
+ LOG.info("Fetching lineage inputs graph for entity={}", guid);
+ ParamChecker.notEmpty(guid, "Entity id");
+ validateDatasetExists(guid);
+ return getInputsGraphForId(guid);
+ }
+
+ private String getInputsGraphForId(String guid) {
+ InputLineageClosureQuery
+ inputsQuery = new InputLineageClosureQuery(AtlasClient.DATA_SET_SUPER_TYPE, SELECT_INSTANCE_GUID,
+ guid, HIVE_PROCESS_TYPE_NAME,
+ HIVE_PROCESS_INPUT_ATTRIBUTE_NAME, HIVE_PROCESS_OUTPUT_ATTRIBUTE_NAME, Option.empty(),
+ SELECT_ATTRIBUTES, true, graphPersistenceStrategy, titanGraph);
+ return inputsQuery.graph().toInstanceJson();
+ }
+
+ @Override
+ public String getOutputsGraphForEntity(String guid) throws AtlasException {
+ LOG.info("Fetching lineage outputs graph for entity guid={}", guid);
+ ParamChecker.notEmpty(guid, "Entity id");
+ validateDatasetExists(guid);
+ return getOutputsGraphForId(guid);
+ }
+
+ private String getOutputsGraphForId(String guid) {
+ OutputLineageClosureQuery outputsQuery =
+ new OutputLineageClosureQuery(AtlasClient.DATA_SET_SUPER_TYPE, SELECT_INSTANCE_GUID, guid, HIVE_PROCESS_TYPE_NAME,
+ HIVE_PROCESS_INPUT_ATTRIBUTE_NAME, HIVE_PROCESS_OUTPUT_ATTRIBUTE_NAME, Option.empty(),
+ SELECT_ATTRIBUTES, true, graphPersistenceStrategy, titanGraph);
+ return outputsQuery.graph().toInstanceJson();
+ }
+
+ /**
+ * Return the schema for the given tableName.
+ *
+ * @param datasetName tableName
+ * @return Schema as JSON
+ */
+ @Override
+ @GraphTransaction
+ public String getSchema(String datasetName) throws AtlasException {
+ ParamChecker.notEmpty(datasetName, "table name");
+ LOG.info("Fetching schema for tableName={}", datasetName);
+ ReferenceableInstance datasetInstance = validateDatasetNameExists(datasetName);
+
+ return getSchemaForId(datasetInstance.getTypeName(), datasetInstance.getId()._getId());
+ }
+
+ private String getSchemaForId(String typeName, String guid) throws DiscoveryException {
+ final String schemaQuery =
+ String.format(propertiesConf.getString(DATASET_SCHEMA_QUERY_PREFIX + typeName), guid);
+ return discoveryService.searchByDSL(schemaQuery);
+ }
+
+ @Override
+ public String getSchemaForEntity(String guid) throws AtlasException {
+ ParamChecker.notEmpty(guid, "Entity id");
+ LOG.info("Fetching schema for entity guid={}", guid);
+ String typeName = validateDatasetExists(guid);
+ return getSchemaForId(typeName, guid);
+ }
+
+ /**
+ * Validate if indeed this is a table type and exists.
+ *
+ * @param datasetName table name
+ */
+ private ReferenceableInstance validateDatasetNameExists(String datasetName) throws AtlasException {
+ final String tableExistsQuery = String.format(DATASET_NAME_EXISTS_QUERY, datasetName);
+ GremlinQueryResult queryResult = discoveryService.evaluate(tableExistsQuery);
+ if (!(queryResult.rows().length() > 0)) {
+ throw new EntityNotFoundException(datasetName + " does not exist");
+ }
+
+ return (ReferenceableInstance)queryResult.rows().apply(0);
+ }
+
+ /**
+ * Validate if indeed this is a table type and exists.
+ *
+ * @param guid entity id
+ */
+ private String validateDatasetExists(String guid) throws AtlasException {
+ final String datasetExistsQuery = String.format(DATASET_EXISTS_QUERY, guid);
+ GremlinQueryResult queryResult = discoveryService.evaluate(datasetExistsQuery);
+ if (!(queryResult.rows().length() > 0)) {
+ throw new EntityNotFoundException("Dataset with guid = " + guid + " does not exist");
+ }
+
+ ReferenceableInstance referenceable = (ReferenceableInstance)queryResult.rows().apply(0);
+ return referenceable.getTypeName();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/b65dd91c/repository/src/main/java/org/apache/atlas/discovery/HiveLineageService.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/org/apache/atlas/discovery/HiveLineageService.java b/repository/src/main/java/org/apache/atlas/discovery/HiveLineageService.java
deleted file mode 100644
index 00905d7..0000000
--- a/repository/src/main/java/org/apache/atlas/discovery/HiveLineageService.java
+++ /dev/null
@@ -1,222 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.atlas.discovery;
-
-import com.thinkaurelius.titan.core.TitanGraph;
-import org.apache.atlas.ApplicationProperties;
-import org.apache.atlas.AtlasException;
-import org.apache.atlas.GraphTransaction;
-import org.apache.atlas.typesystem.exception.EntityNotFoundException;
-import org.apache.atlas.utils.ParamChecker;
-import org.apache.atlas.discovery.graph.DefaultGraphPersistenceStrategy;
-import org.apache.atlas.discovery.graph.GraphBackedDiscoveryService;
-import org.apache.atlas.query.Expressions;
-import org.apache.atlas.query.GremlinQueryResult;
-import org.apache.atlas.query.HiveLineageQuery;
-import org.apache.atlas.query.HiveWhereUsedQuery;
-import org.apache.atlas.repository.MetadataRepository;
-import org.apache.atlas.repository.graph.GraphProvider;
-import org.apache.atlas.typesystem.persistence.ReferenceableInstance;
-import org.apache.commons.configuration.Configuration;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import scala.Option;
-import scala.Some;
-import scala.collection.immutable.List;
-
-import javax.inject.Inject;
-import javax.inject.Singleton;
-
-/**
- * Hive implementation of Lineage service interface.
- */
-@Singleton
-public class HiveLineageService implements LineageService {
-
- private static final Logger LOG = LoggerFactory.getLogger(HiveLineageService.class);
-
- private static final Option<List<String>> SELECT_ATTRIBUTES =
- Some.<List<String>>apply(List.<String>fromArray(new String[]{"name"}));
-
- public static final String HIVE_TABLE_SCHEMA_QUERY_PREFIX = "atlas.lineage.hive.table.schema.query.";
-
- private static final String HIVE_TABLE_TYPE_NAME;
- private static final String HIVE_PROCESS_TYPE_NAME;
- private static final String HIVE_PROCESS_INPUT_ATTRIBUTE_NAME;
- private static final String HIVE_PROCESS_OUTPUT_ATTRIBUTE_NAME;
-
- private static final String HIVE_TABLE_EXISTS_QUERY;
-
- private static final Configuration propertiesConf;
-
- static {
- // todo - externalize this using type system - dog food
- try {
- propertiesConf = ApplicationProperties.get();
- HIVE_TABLE_TYPE_NAME = propertiesConf.getString("atlas.lineage.hive.table.type.name", "DataSet");
- HIVE_PROCESS_TYPE_NAME = propertiesConf.getString("atlas.lineage.hive.process.type.name", "Process");
- HIVE_PROCESS_INPUT_ATTRIBUTE_NAME = propertiesConf.getString("atlas.lineage.hive.process.inputs.name", "inputs");
- HIVE_PROCESS_OUTPUT_ATTRIBUTE_NAME = propertiesConf.getString("atlas.lineage.hive.process.outputs.name", "outputs");
-
- HIVE_TABLE_EXISTS_QUERY = propertiesConf.getString("atlas.lineage.hive.table.exists.query",
- "from " + HIVE_TABLE_TYPE_NAME + " where name=\"%s\"");
- } catch (AtlasException e) {
- throw new RuntimeException(e);
- }
- }
-
-
- private final TitanGraph titanGraph;
- private final DefaultGraphPersistenceStrategy graphPersistenceStrategy;
- private final GraphBackedDiscoveryService discoveryService;
-
- @Inject
- HiveLineageService(GraphProvider<TitanGraph> graphProvider, MetadataRepository metadataRepository,
- GraphBackedDiscoveryService discoveryService) throws DiscoveryException {
- this.titanGraph = graphProvider.get();
- this.graphPersistenceStrategy = new DefaultGraphPersistenceStrategy(metadataRepository);
- this.discoveryService = discoveryService;
- }
-
- /**
- * Return the lineage outputs for the given tableName.
- *
- * @param tableName tableName
- * @return Lineage Outputs as JSON
- */
- @Override
- @GraphTransaction
- public String getOutputs(String tableName) throws AtlasException {
- LOG.info("Fetching lineage outputs for tableName={}", tableName);
- ParamChecker.notEmpty(tableName, "table name cannot be null");
- validateTableExists(tableName);
-
- HiveWhereUsedQuery outputsQuery =
- new HiveWhereUsedQuery(HIVE_TABLE_TYPE_NAME, tableName, HIVE_PROCESS_TYPE_NAME,
- HIVE_PROCESS_INPUT_ATTRIBUTE_NAME, HIVE_PROCESS_OUTPUT_ATTRIBUTE_NAME, Option.empty(),
- SELECT_ATTRIBUTES, true, graphPersistenceStrategy, titanGraph);
-
- Expressions.Expression expression = outputsQuery.expr();
- LOG.debug("Expression is [" + expression.toString() + "]");
- try {
- return discoveryService.evaluate(expression).toJson();
- } catch (Exception e) { // unable to catch ExpressionException
- throw new DiscoveryException("Invalid expression [" + expression.toString() + "]", e);
- }
- }
-
- /**
- * Return the lineage outputs graph for the given tableName.
- *
- * @param tableName tableName
- * @return Outputs Graph as JSON
- */
- @Override
- @GraphTransaction
- public String getOutputsGraph(String tableName) throws AtlasException {
- LOG.info("Fetching lineage outputs graph for tableName={}", tableName);
- ParamChecker.notEmpty(tableName, "table name cannot be null");
- validateTableExists(tableName);
-
- HiveWhereUsedQuery outputsQuery =
- new HiveWhereUsedQuery(HIVE_TABLE_TYPE_NAME, tableName, HIVE_PROCESS_TYPE_NAME,
- HIVE_PROCESS_INPUT_ATTRIBUTE_NAME, HIVE_PROCESS_OUTPUT_ATTRIBUTE_NAME, Option.empty(),
- SELECT_ATTRIBUTES, true, graphPersistenceStrategy, titanGraph);
- return outputsQuery.graph().toInstanceJson();
- }
-
- /**
- * Return the lineage inputs for the given tableName.
- *
- * @param tableName tableName
- * @return Lineage Inputs as JSON
- */
- @Override
- @GraphTransaction
- public String getInputs(String tableName) throws AtlasException {
- LOG.info("Fetching lineage inputs for tableName={}", tableName);
- ParamChecker.notEmpty(tableName, "table name cannot be null");
- validateTableExists(tableName);
-
- HiveLineageQuery inputsQuery = new HiveLineageQuery(HIVE_TABLE_TYPE_NAME, tableName, HIVE_PROCESS_TYPE_NAME,
- HIVE_PROCESS_INPUT_ATTRIBUTE_NAME, HIVE_PROCESS_OUTPUT_ATTRIBUTE_NAME, Option.empty(),
- SELECT_ATTRIBUTES, true, graphPersistenceStrategy, titanGraph);
-
- Expressions.Expression expression = inputsQuery.expr();
- LOG.debug("Expression is [" + expression.toString() + "]");
- try {
- return discoveryService.evaluate(expression).toJson();
- } catch (Exception e) { // unable to catch ExpressionException
- throw new DiscoveryException("Invalid expression [" + expression.toString() + "]", e);
- }
- }
-
- /**
- * Return the lineage inputs graph for the given tableName.
- *
- * @param tableName tableName
- * @return Inputs Graph as JSON
- */
- @Override
- @GraphTransaction
- public String getInputsGraph(String tableName) throws AtlasException {
- LOG.info("Fetching lineage inputs graph for tableName={}", tableName);
- ParamChecker.notEmpty(tableName, "table name cannot be null");
- validateTableExists(tableName);
-
- HiveLineageQuery inputsQuery = new HiveLineageQuery(HIVE_TABLE_TYPE_NAME, tableName, HIVE_PROCESS_TYPE_NAME,
- HIVE_PROCESS_INPUT_ATTRIBUTE_NAME, HIVE_PROCESS_OUTPUT_ATTRIBUTE_NAME, Option.empty(),
- SELECT_ATTRIBUTES, true, graphPersistenceStrategy, titanGraph);
- return inputsQuery.graph().toInstanceJson();
- }
-
- /**
- * Return the schema for the given tableName.
- *
- * @param tableName tableName
- * @return Schema as JSON
- */
- @Override
- @GraphTransaction
- public String getSchema(String tableName) throws AtlasException {
- LOG.info("Fetching schema for tableName={}", tableName);
- ParamChecker.notEmpty(tableName, "table name cannot be null");
- String typeName = validateTableExists(tableName);
-
- final String schemaQuery =
- String.format(propertiesConf.getString(HIVE_TABLE_SCHEMA_QUERY_PREFIX + typeName), tableName);
- return discoveryService.searchByDSL(schemaQuery);
- }
-
- /**
- * Validate if indeed this is a table type and exists.
- *
- * @param tableName table name
- */
- private String validateTableExists(String tableName) throws AtlasException {
- final String tableExistsQuery = String.format(HIVE_TABLE_EXISTS_QUERY, tableName);
- GremlinQueryResult queryResult = discoveryService.evaluate(tableExistsQuery);
- if (!(queryResult.rows().length() > 0)) {
- throw new EntityNotFoundException(tableName + " does not exist");
- }
-
- ReferenceableInstance referenceable = (ReferenceableInstance)queryResult.rows().apply(0);
- return referenceable.getTypeName();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/b65dd91c/repository/src/main/scala/org/apache/atlas/query/ClosureQuery.scala
----------------------------------------------------------------------
diff --git a/repository/src/main/scala/org/apache/atlas/query/ClosureQuery.scala b/repository/src/main/scala/org/apache/atlas/query/ClosureQuery.scala
index 05dc6a4..c4621cd 100755
--- a/repository/src/main/scala/org/apache/atlas/query/ClosureQuery.scala
+++ b/repository/src/main/scala/org/apache/atlas/query/ClosureQuery.scala
@@ -256,21 +256,21 @@ trait SingleInstanceClosureQuery[T] extends ClosureQuery {
* @param persistenceStrategy as needed to evaluate the Closure Query.
* @param g as needed to evaluate the Closure Query.
*/
-case class HiveLineageQuery(tableTypeName : String,
- tableName : String,
- ctasTypeName : String,
- ctasInputTableAttribute : String,
- ctasOutputTableAttribute : String,
- depth : Option[Int],
- selectAttributes : Option[List[String]],
- withPath : Boolean,
- persistenceStrategy: GraphPersistenceStrategies,
- g: TitanGraph
+case class InputLineageClosureQuery(tableTypeName : String,
+ attributeToSelectInstance : String,
+ tableName : String,
+ ctasTypeName : String,
+ ctasInputTableAttribute : String,
+ ctasOutputTableAttribute : String,
+ depth : Option[Int],
+ selectAttributes : Option[List[String]],
+ withPath : Boolean,
+ persistenceStrategy: GraphPersistenceStrategies,
+ g: TitanGraph
) extends SingleInstanceClosureQuery[String] {
val closureType : String = tableTypeName
- val attributeToSelectInstance = "name"
val attributeTyp = DataTypes.STRING_TYPE
val instanceValue = tableName
@@ -296,21 +296,21 @@ case class HiveLineageQuery(tableTypeName : String,
* @param persistenceStrategy as needed to evaluate the Closure Query.
* @param g as needed to evaluate the Closure Query.
*/
-case class HiveWhereUsedQuery(tableTypeName : String,
- tableName : String,
- ctasTypeName : String,
- ctasInputTableAttribute : String,
- ctasOutputTableAttribute : String,
- depth : Option[Int],
- selectAttributes : Option[List[String]],
- withPath : Boolean,
- persistenceStrategy: GraphPersistenceStrategies,
- g: TitanGraph
+case class OutputLineageClosureQuery(tableTypeName : String,
+ attributeToSelectInstance : String,
+ tableName : String,
+ ctasTypeName : String,
+ ctasInputTableAttribute : String,
+ ctasOutputTableAttribute : String,
+ depth : Option[Int],
+ selectAttributes : Option[List[String]],
+ withPath : Boolean,
+ persistenceStrategy: GraphPersistenceStrategies,
+ g: TitanGraph
) extends SingleInstanceClosureQuery[String] {
val closureType : String = tableTypeName
- val attributeToSelectInstance = "name"
val attributeTyp = DataTypes.STRING_TYPE
val instanceValue = tableName
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/b65dd91c/repository/src/test/java/org/apache/atlas/BaseHiveRepositoryTest.java
----------------------------------------------------------------------
diff --git a/repository/src/test/java/org/apache/atlas/BaseHiveRepositoryTest.java b/repository/src/test/java/org/apache/atlas/BaseHiveRepositoryTest.java
deleted file mode 100644
index 40f0d91..0000000
--- a/repository/src/test/java/org/apache/atlas/BaseHiveRepositoryTest.java
+++ /dev/null
@@ -1,377 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.atlas;
-
-import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableSet;
-import com.thinkaurelius.titan.core.TitanGraph;
-import com.thinkaurelius.titan.core.util.TitanCleanup;
-
-import org.apache.atlas.repository.MetadataRepository;
-import org.apache.atlas.repository.graph.GraphBackedSearchIndexer;
-import org.apache.atlas.repository.graph.GraphProvider;
-import org.apache.atlas.services.MetadataService;
-import org.apache.atlas.typesystem.ITypedReferenceableInstance;
-import org.apache.atlas.typesystem.Referenceable;
-import org.apache.atlas.typesystem.TypesDef;
-import org.apache.atlas.typesystem.json.TypesSerialization;
-import org.apache.atlas.typesystem.persistence.Id;
-import org.apache.atlas.typesystem.types.AttributeDefinition;
-import org.apache.atlas.typesystem.types.ClassType;
-import org.apache.atlas.typesystem.types.DataTypes;
-import org.apache.atlas.typesystem.types.EnumTypeDefinition;
-import org.apache.atlas.typesystem.types.HierarchicalTypeDefinition;
-import org.apache.atlas.typesystem.types.IDataType;
-import org.apache.atlas.typesystem.types.Multiplicity;
-import org.apache.atlas.typesystem.types.StructTypeDefinition;
-import org.apache.atlas.typesystem.types.TraitType;
-import org.apache.atlas.typesystem.types.TypeSystem;
-import org.apache.atlas.typesystem.types.utils.TypesUtil;
-import org.testng.annotations.Guice;
-
-import javax.inject.Inject;
-
-import java.util.ArrayList;
-import java.util.Date;
-import java.util.List;
-
-/**
- * Base Class to set up hive types and instances for tests
- */
-@Guice(modules = RepositoryMetadataModule.class)
-public class BaseHiveRepositoryTest {
-
- @Inject
- protected MetadataService metadataService;
-
- @Inject
- protected MetadataRepository repository;
-
- @Inject
- protected GraphProvider<TitanGraph> graphProvider;
-
- protected void setUp() throws Exception {
- setUpTypes();
- new GraphBackedSearchIndexer(graphProvider);
- RequestContext.createContext();
- setupInstances();
- TestUtils.dumpGraph(graphProvider.get());
- }
-
- protected void tearDown() throws Exception {
- TypeSystem.getInstance().reset();
- try {
- graphProvider.get().shutdown();
- } catch (Exception e) {
- e.printStackTrace();
- }
- try {
- TitanCleanup.clear(graphProvider.get());
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
-
- private void setUpTypes() throws Exception {
- TypesDef typesDef = createTypeDefinitions();
- String typesAsJSON = TypesSerialization.toJson(typesDef);
- metadataService.createType(typesAsJSON);
- }
-
- private static final String DATABASE_TYPE = "hive_db";
- private static final String HIVE_TABLE_TYPE = "hive_table";
- private static final String COLUMN_TYPE = "hive_column";
- private static final String HIVE_PROCESS_TYPE = "hive_process";
- private static final String STORAGE_DESC_TYPE = "StorageDesc";
- private static final String VIEW_TYPE = "View";
- private static final String PARTITION_TYPE = "hive_partition";
-
- TypesDef createTypeDefinitions() {
- HierarchicalTypeDefinition<ClassType> dbClsDef = TypesUtil
- .createClassTypeDef(DATABASE_TYPE, null, attrDef("name", DataTypes.STRING_TYPE),
- attrDef("description", DataTypes.STRING_TYPE), attrDef("locationUri", DataTypes.STRING_TYPE),
- attrDef("owner", DataTypes.STRING_TYPE), attrDef("createTime", DataTypes.LONG_TYPE));
-
- HierarchicalTypeDefinition<ClassType> columnClsDef = TypesUtil
- .createClassTypeDef(COLUMN_TYPE, null, attrDef("name", DataTypes.STRING_TYPE),
- attrDef("dataType", DataTypes.STRING_TYPE), attrDef("comment", DataTypes.STRING_TYPE));
-
- HierarchicalTypeDefinition<ClassType> storageDescClsDef = TypesUtil
- .createClassTypeDef(STORAGE_DESC_TYPE, null,
- attrDef("location", DataTypes.STRING_TYPE),
- attrDef("inputFormat", DataTypes.STRING_TYPE), attrDef("outputFormat", DataTypes.STRING_TYPE),
- attrDef("compressed", DataTypes.STRING_TYPE, Multiplicity.REQUIRED, false, null));
-
-
- HierarchicalTypeDefinition<ClassType> tblClsDef = TypesUtil
- .createClassTypeDef(HIVE_TABLE_TYPE, ImmutableSet.of("DataSet"),
- attrDef("owner", DataTypes.STRING_TYPE),
- attrDef("createTime", DataTypes.DATE_TYPE),
- attrDef("lastAccessTime", DataTypes.LONG_TYPE), attrDef("tableType", DataTypes.STRING_TYPE),
- attrDef("temporary", DataTypes.BOOLEAN_TYPE),
- new AttributeDefinition("db", DATABASE_TYPE, Multiplicity.REQUIRED, false, null),
- // todo - uncomment this, something is broken
- new AttributeDefinition("sd", STORAGE_DESC_TYPE,
- Multiplicity.REQUIRED, true, null),
- new AttributeDefinition("columns", DataTypes.arrayTypeName(COLUMN_TYPE),
- Multiplicity.COLLECTION, true, null));
-
- HierarchicalTypeDefinition<ClassType> loadProcessClsDef = TypesUtil
- .createClassTypeDef(HIVE_PROCESS_TYPE, ImmutableSet.of("Process"),
- attrDef("userName", DataTypes.STRING_TYPE), attrDef("startTime", DataTypes.LONG_TYPE),
- attrDef("endTime", DataTypes.LONG_TYPE),
- attrDef("queryText", DataTypes.STRING_TYPE, Multiplicity.REQUIRED),
- attrDef("queryPlan", DataTypes.STRING_TYPE, Multiplicity.REQUIRED),
- attrDef("queryId", DataTypes.STRING_TYPE, Multiplicity.REQUIRED),
- attrDef("queryGraph", DataTypes.STRING_TYPE, Multiplicity.REQUIRED));
-
- HierarchicalTypeDefinition<ClassType> viewClsDef = TypesUtil
- .createClassTypeDef(VIEW_TYPE, null, attrDef("name", DataTypes.STRING_TYPE),
- new AttributeDefinition("db", DATABASE_TYPE, Multiplicity.REQUIRED, false, null),
- new AttributeDefinition("inputTables", DataTypes.arrayTypeName(HIVE_TABLE_TYPE),
- Multiplicity.COLLECTION, false, null));
-
- AttributeDefinition[] attributeDefinitions = new AttributeDefinition[]{
- new AttributeDefinition("values", DataTypes.arrayTypeName(DataTypes.STRING_TYPE.getName()),
- Multiplicity.OPTIONAL, false, null),
- new AttributeDefinition("table", HIVE_TABLE_TYPE, Multiplicity.REQUIRED, false, null),
- };
- HierarchicalTypeDefinition<ClassType> partClsDef =
- new HierarchicalTypeDefinition<>(ClassType.class, PARTITION_TYPE, null, null,
- attributeDefinitions);
-
- HierarchicalTypeDefinition<TraitType> dimTraitDef = TypesUtil.createTraitTypeDef("Dimension", null);
-
- HierarchicalTypeDefinition<TraitType> factTraitDef = TypesUtil.createTraitTypeDef("Fact", null);
-
- HierarchicalTypeDefinition<TraitType> metricTraitDef = TypesUtil.createTraitTypeDef("Metric", null);
-
- HierarchicalTypeDefinition<TraitType> etlTraitDef = TypesUtil.createTraitTypeDef("ETL", null);
-
- HierarchicalTypeDefinition<TraitType> piiTraitDef = TypesUtil.createTraitTypeDef("PII", null);
-
- HierarchicalTypeDefinition<TraitType> jdbcTraitDef = TypesUtil.createTraitTypeDef("JdbcAccess", null);
-
- HierarchicalTypeDefinition<TraitType> logTraitDef = TypesUtil.createTraitTypeDef("Log Data", null);
-
- return TypesUtil.getTypesDef(ImmutableList.<EnumTypeDefinition>of(), ImmutableList.<StructTypeDefinition>of(),
- ImmutableList.of(dimTraitDef, factTraitDef, piiTraitDef, metricTraitDef, etlTraitDef, jdbcTraitDef, logTraitDef),
- ImmutableList.of(dbClsDef, storageDescClsDef, columnClsDef, tblClsDef, loadProcessClsDef, viewClsDef, partClsDef));
- }
-
- AttributeDefinition attrDef(String name, IDataType dT) {
- return attrDef(name, dT, Multiplicity.OPTIONAL, false, null);
- }
-
- AttributeDefinition attrDef(String name, IDataType dT, Multiplicity m) {
- return attrDef(name, dT, m, false, null);
- }
-
- AttributeDefinition attrDef(String name, IDataType dT, Multiplicity m, boolean isComposite,
- String reverseAttributeName) {
- Preconditions.checkNotNull(name);
- Preconditions.checkNotNull(dT);
- return new AttributeDefinition(name, dT.getName(), m, isComposite, reverseAttributeName);
- }
-
- private void setupInstances() throws Exception {
- Id salesDB = database("Sales", "Sales Database", "John ETL", "hdfs://host:8000/apps/warehouse/sales");
-
- Referenceable sd =
- storageDescriptor("hdfs://host:8000/apps/warehouse/sales", "TextInputFormat", "TextOutputFormat", true, ImmutableList.of(
- column("time_id", "int", "time id")));
-
- List<Referenceable> salesFactColumns = ImmutableList
- .of(column("time_id", "int", "time id"),
- column("product_id", "int", "product id"),
- column("customer_id", "int", "customer id", "PII"),
- column("sales", "double", "product id", "Metric"));
-
- Id salesFact = table("sales_fact", "sales fact table", salesDB, sd, "Joe", "Managed", salesFactColumns, "Fact");
-
- List<Referenceable> logFactColumns = ImmutableList
- .of(column("time_id", "int", "time id"), column("app_id", "int", "app id"),
- column("machine_id", "int", "machine id"), column("log", "string", "log data", "Log Data"));
-
- List<Referenceable> timeDimColumns = ImmutableList
- .of(column("time_id", "int", "time id"),
- column("dayOfYear", "int", "day Of Year"),
- column("weekDay", "int", "week Day"));
-
- Id timeDim = table("time_dim", "time dimension table", salesDB, sd, "John Doe", "External", timeDimColumns,
- "Dimension");
-
- Id reportingDB =
- database("Reporting", "reporting database", "Jane BI", "hdfs://host:8000/apps/warehouse/reporting");
-
- Id salesFactDaily =
- table("sales_fact_daily_mv", "sales fact daily materialized view", reportingDB, sd, "Joe BI", "Managed",
- salesFactColumns, "Metric");
-
- loadProcess("loadSalesDaily", "hive query for daily summary", "John ETL", ImmutableList.of(salesFact, timeDim),
- ImmutableList.of(salesFactDaily), "create table as select ", "plan", "id", "graph", "ETL");
-
- Id logDB = database("Logging", "logging database", "Tim ETL", "hdfs://host:8000/apps/warehouse/logging");
-
- Id loggingFactDaily =
- table("log_fact_daily_mv", "log fact daily materialized view", logDB, sd, "Tim ETL", "Managed",
- logFactColumns, "Log Data");
-
- List<Referenceable> productDimColumns = ImmutableList
- .of(column("product_id", "int", "product id"),
- column("product_name", "string", "product name"),
- column("brand_name", "int", "brand name"));
-
- Id productDim =
- table("product_dim", "product dimension table", salesDB, sd, "John Doe", "Managed", productDimColumns,
- "Dimension");
-
- view("product_dim_view", reportingDB, ImmutableList.of(productDim), "Dimension", "JdbcAccess");
-
- List<Referenceable> customerDimColumns = ImmutableList.of(
- column("customer_id", "int", "customer id", "PII"),
- column("name", "string", "customer name", "PII"),
- column("address", "string", "customer address", "PII"));
-
- Id customerDim =
- table("customer_dim", "customer dimension table", salesDB, sd, "fetl", "External", customerDimColumns,
- "Dimension");
-
- view("customer_dim_view", reportingDB, ImmutableList.of(customerDim), "Dimension", "JdbcAccess");
-
- Id salesFactMonthly =
- table("sales_fact_monthly_mv", "sales fact monthly materialized view", reportingDB, sd, "Jane BI",
- "Managed", salesFactColumns, "Metric");
-
- loadProcess("loadSalesMonthly", "hive query for monthly summary", "John ETL", ImmutableList.of(salesFactDaily),
- ImmutableList.of(salesFactMonthly), "create table as select ", "plan", "id", "graph", "ETL");
-
- Id loggingFactMonthly =
- table("logging_fact_monthly_mv", "logging fact monthly materialized view", logDB, sd, "Tim ETL",
- "Managed", logFactColumns, "Log Data");
-
- loadProcess("loadLogsMonthly", "hive query for monthly summary", "Tim ETL", ImmutableList.of(loggingFactDaily),
- ImmutableList.of(loggingFactMonthly), "create table as select ", "plan", "id", "graph", "ETL");
-
- partition(new ArrayList() {{ add("2015-01-01"); }}, salesFactDaily);
- }
-
- Id database(String name, String description, String owner, String locationUri, String... traitNames)
- throws Exception {
- Referenceable referenceable = new Referenceable(DATABASE_TYPE, traitNames);
- referenceable.set("name", name);
- referenceable.set("description", description);
- referenceable.set("owner", owner);
- referenceable.set("locationUri", locationUri);
- referenceable.set("createTime", System.currentTimeMillis());
-
- ClassType clsType = TypeSystem.getInstance().getDataType(ClassType.class, DATABASE_TYPE);
- return createInstance(referenceable, clsType);
- }
-
- Referenceable storageDescriptor(String location, String inputFormat, String outputFormat, boolean compressed, List<Referenceable> columns)
- throws Exception {
- Referenceable referenceable = new Referenceable(STORAGE_DESC_TYPE);
- referenceable.set("location", location);
- referenceable.set("inputFormat", inputFormat);
- referenceable.set("outputFormat", outputFormat);
- referenceable.set("compressed", compressed);
- referenceable.set("cols", columns);
-
- return referenceable;
- }
-
- Referenceable column(String name, String dataType, String comment, String... traitNames) throws Exception {
- Referenceable referenceable = new Referenceable(COLUMN_TYPE, traitNames);
- referenceable.set("name", name);
- referenceable.set("dataType", dataType);
- referenceable.set("comment", comment);
-
- return referenceable;
- }
-
- Id table(String name, String description, Id dbId, Referenceable sd, String owner, String tableType,
- List<Referenceable> columns, String... traitNames) throws Exception {
- Referenceable referenceable = new Referenceable(HIVE_TABLE_TYPE, traitNames);
- referenceable.set("name", name);
- referenceable.set("description", description);
- referenceable.set("owner", owner);
- referenceable.set("tableType", tableType);
- referenceable.set("temporary", false);
- referenceable.set("createTime", new Date(System.currentTimeMillis()));
- referenceable.set("lastAccessTime", System.currentTimeMillis());
- referenceable.set("retention", System.currentTimeMillis());
-
- referenceable.set("db", dbId);
- // todo - uncomment this, something is broken
- referenceable.set("sd", sd);
- referenceable.set("columns", columns);
-
- ClassType clsType = TypeSystem.getInstance().getDataType(ClassType.class, HIVE_TABLE_TYPE);
- return createInstance(referenceable, clsType);
- }
-
- Id loadProcess(String name, String description, String user, List<Id> inputTables, List<Id> outputTables,
- String queryText, String queryPlan, String queryId, String queryGraph, String... traitNames)
- throws Exception {
- Referenceable referenceable = new Referenceable(HIVE_PROCESS_TYPE, traitNames);
- referenceable.set(AtlasClient.NAME, name);
- referenceable.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, name);
- referenceable.set("description", description);
- referenceable.set("user", user);
- referenceable.set("startTime", System.currentTimeMillis());
- referenceable.set("endTime", System.currentTimeMillis() + 10000);
-
- referenceable.set("inputs", inputTables);
- referenceable.set("outputs", outputTables);
-
- referenceable.set("queryText", queryText);
- referenceable.set("queryPlan", queryPlan);
- referenceable.set("queryId", queryId);
- referenceable.set("queryGraph", queryGraph);
-
- ClassType clsType = TypeSystem.getInstance().getDataType(ClassType.class, HIVE_PROCESS_TYPE);
- return createInstance(referenceable, clsType);
- }
-
- Id view(String name, Id dbId, List<Id> inputTables, String... traitNames) throws Exception {
- Referenceable referenceable = new Referenceable(VIEW_TYPE, traitNames);
- referenceable.set("name", name);
- referenceable.set("db", dbId);
-
- referenceable.set("inputTables", inputTables);
- ClassType clsType = TypeSystem.getInstance().getDataType(ClassType.class, VIEW_TYPE);
- return createInstance(referenceable, clsType);
- }
-
- Id partition(List<String> values, Id table, String... traitNames) throws Exception {
- Referenceable referenceable = new Referenceable(PARTITION_TYPE, traitNames);
- referenceable.set("values", values);
- referenceable.set("table", table);
- ClassType clsType = TypeSystem.getInstance().getDataType(ClassType.class, PARTITION_TYPE);
- return createInstance(referenceable, clsType);
- }
- private Id createInstance(Referenceable referenceable, ClassType clsType) throws Exception {
- ITypedReferenceableInstance typedInstance = clsType.convert(referenceable, Multiplicity.REQUIRED);
- List<String> guids = repository.createEntities(typedInstance);
-
- // return the reference to created instance with guid
- return new Id(guids.get(guids.size() - 1), 0, referenceable.getTypeName());
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/b65dd91c/repository/src/test/java/org/apache/atlas/BaseRepositoryTest.java
----------------------------------------------------------------------
diff --git a/repository/src/test/java/org/apache/atlas/BaseRepositoryTest.java b/repository/src/test/java/org/apache/atlas/BaseRepositoryTest.java
new file mode 100644
index 0000000..d1f9430
--- /dev/null
+++ b/repository/src/test/java/org/apache/atlas/BaseRepositoryTest.java
@@ -0,0 +1,377 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import com.thinkaurelius.titan.core.TitanGraph;
+import com.thinkaurelius.titan.core.util.TitanCleanup;
+
+import org.apache.atlas.repository.MetadataRepository;
+import org.apache.atlas.repository.graph.GraphBackedSearchIndexer;
+import org.apache.atlas.repository.graph.GraphProvider;
+import org.apache.atlas.services.MetadataService;
+import org.apache.atlas.typesystem.ITypedReferenceableInstance;
+import org.apache.atlas.typesystem.Referenceable;
+import org.apache.atlas.typesystem.TypesDef;
+import org.apache.atlas.typesystem.json.TypesSerialization;
+import org.apache.atlas.typesystem.persistence.Id;
+import org.apache.atlas.typesystem.types.AttributeDefinition;
+import org.apache.atlas.typesystem.types.ClassType;
+import org.apache.atlas.typesystem.types.DataTypes;
+import org.apache.atlas.typesystem.types.EnumTypeDefinition;
+import org.apache.atlas.typesystem.types.HierarchicalTypeDefinition;
+import org.apache.atlas.typesystem.types.IDataType;
+import org.apache.atlas.typesystem.types.Multiplicity;
+import org.apache.atlas.typesystem.types.StructTypeDefinition;
+import org.apache.atlas.typesystem.types.TraitType;
+import org.apache.atlas.typesystem.types.TypeSystem;
+import org.apache.atlas.typesystem.types.utils.TypesUtil;
+import org.testng.annotations.Guice;
+
+import javax.inject.Inject;
+
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+
+/**
+ * Base Class to set up hive types and instances for tests
+ */
+@Guice(modules = RepositoryMetadataModule.class)
+public class BaseRepositoryTest {
+
+ @Inject
+ protected MetadataService metadataService;
+
+ @Inject
+ protected MetadataRepository repository;
+
+ @Inject
+ protected GraphProvider<TitanGraph> graphProvider;
+
+ protected void setUp() throws Exception {
+ setUpTypes();
+ new GraphBackedSearchIndexer(graphProvider);
+ RequestContext.createContext();
+ setupInstances();
+ TestUtils.dumpGraph(graphProvider.get());
+ }
+
+ protected void tearDown() throws Exception {
+ TypeSystem.getInstance().reset();
+ try {
+ graphProvider.get().shutdown();
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ try {
+ TitanCleanup.clear(graphProvider.get());
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+
+ private void setUpTypes() throws Exception {
+ TypesDef typesDef = createTypeDefinitions();
+ String typesAsJSON = TypesSerialization.toJson(typesDef);
+ metadataService.createType(typesAsJSON);
+ }
+
+ protected static final String DATABASE_TYPE = "hive_db";
+ protected static final String HIVE_TABLE_TYPE = "hive_table";
+ private static final String COLUMN_TYPE = "hive_column";
+ private static final String HIVE_PROCESS_TYPE = "hive_process";
+ private static final String STORAGE_DESC_TYPE = "StorageDesc";
+ private static final String VIEW_TYPE = "View";
+ private static final String PARTITION_TYPE = "hive_partition";
+
+ TypesDef createTypeDefinitions() {
+ HierarchicalTypeDefinition<ClassType> dbClsDef = TypesUtil
+ .createClassTypeDef(DATABASE_TYPE, null,
+ TypesUtil.createUniqueRequiredAttrDef("name", DataTypes.STRING_TYPE),
+ attrDef("description", DataTypes.STRING_TYPE), attrDef("locationUri", DataTypes.STRING_TYPE),
+ attrDef("owner", DataTypes.STRING_TYPE), attrDef("createTime", DataTypes.LONG_TYPE));
+
+ HierarchicalTypeDefinition<ClassType> columnClsDef = TypesUtil
+ .createClassTypeDef(COLUMN_TYPE, null, attrDef("name", DataTypes.STRING_TYPE),
+ attrDef("dataType", DataTypes.STRING_TYPE), attrDef("comment", DataTypes.STRING_TYPE));
+
+ HierarchicalTypeDefinition<ClassType> storageDescClsDef = TypesUtil
+ .createClassTypeDef(STORAGE_DESC_TYPE, null,
+ attrDef("location", DataTypes.STRING_TYPE),
+ attrDef("inputFormat", DataTypes.STRING_TYPE), attrDef("outputFormat", DataTypes.STRING_TYPE),
+ attrDef("compressed", DataTypes.STRING_TYPE, Multiplicity.REQUIRED, false, null));
+
+
+ HierarchicalTypeDefinition<ClassType> tblClsDef = TypesUtil
+ .createClassTypeDef(HIVE_TABLE_TYPE, ImmutableSet.of("DataSet"),
+ attrDef("owner", DataTypes.STRING_TYPE),
+ attrDef("createTime", DataTypes.DATE_TYPE),
+ attrDef("lastAccessTime", DataTypes.LONG_TYPE), attrDef("tableType", DataTypes.STRING_TYPE),
+ attrDef("temporary", DataTypes.BOOLEAN_TYPE),
+ new AttributeDefinition("db", DATABASE_TYPE, Multiplicity.REQUIRED, false, null),
+ // todo - uncomment this, something is broken
+ new AttributeDefinition("sd", STORAGE_DESC_TYPE, Multiplicity.REQUIRED, true, null),
+ new AttributeDefinition("columns", DataTypes.arrayTypeName(COLUMN_TYPE),
+ Multiplicity.COLLECTION, true, null));
+
+ HierarchicalTypeDefinition<ClassType> loadProcessClsDef = TypesUtil
+ .createClassTypeDef(HIVE_PROCESS_TYPE, ImmutableSet.of("Process"),
+ attrDef("userName", DataTypes.STRING_TYPE), attrDef("startTime", DataTypes.LONG_TYPE),
+ attrDef("endTime", DataTypes.LONG_TYPE),
+ attrDef("queryText", DataTypes.STRING_TYPE, Multiplicity.REQUIRED),
+ attrDef("queryPlan", DataTypes.STRING_TYPE, Multiplicity.REQUIRED),
+ attrDef("queryId", DataTypes.STRING_TYPE, Multiplicity.REQUIRED),
+ attrDef("queryGraph", DataTypes.STRING_TYPE, Multiplicity.REQUIRED));
+
+ HierarchicalTypeDefinition<ClassType> viewClsDef = TypesUtil
+ .createClassTypeDef(VIEW_TYPE, null, attrDef("name", DataTypes.STRING_TYPE),
+ new AttributeDefinition("db", DATABASE_TYPE, Multiplicity.REQUIRED, false, null),
+ new AttributeDefinition("inputTables", DataTypes.arrayTypeName(HIVE_TABLE_TYPE),
+ Multiplicity.COLLECTION, false, null));
+
+ AttributeDefinition[] attributeDefinitions = new AttributeDefinition[]{
+ new AttributeDefinition("values", DataTypes.arrayTypeName(DataTypes.STRING_TYPE.getName()),
+ Multiplicity.OPTIONAL, false, null),
+ new AttributeDefinition("table", HIVE_TABLE_TYPE, Multiplicity.REQUIRED, false, null),
+ };
+ HierarchicalTypeDefinition<ClassType> partClsDef =
+ new HierarchicalTypeDefinition<>(ClassType.class, PARTITION_TYPE, null, null,
+ attributeDefinitions);
+
+ HierarchicalTypeDefinition<TraitType> dimTraitDef = TypesUtil.createTraitTypeDef("Dimension", null);
+
+ HierarchicalTypeDefinition<TraitType> factTraitDef = TypesUtil.createTraitTypeDef("Fact", null);
+
+ HierarchicalTypeDefinition<TraitType> metricTraitDef = TypesUtil.createTraitTypeDef("Metric", null);
+
+ HierarchicalTypeDefinition<TraitType> etlTraitDef = TypesUtil.createTraitTypeDef("ETL", null);
+
+ HierarchicalTypeDefinition<TraitType> piiTraitDef = TypesUtil.createTraitTypeDef("PII", null);
+
+ HierarchicalTypeDefinition<TraitType> jdbcTraitDef = TypesUtil.createTraitTypeDef("JdbcAccess", null);
+
+ HierarchicalTypeDefinition<TraitType> logTraitDef = TypesUtil.createTraitTypeDef("Log Data", null);
+
+ return TypesUtil.getTypesDef(ImmutableList.<EnumTypeDefinition>of(), ImmutableList.<StructTypeDefinition>of(),
+ ImmutableList.of(dimTraitDef, factTraitDef, piiTraitDef, metricTraitDef, etlTraitDef, jdbcTraitDef, logTraitDef),
+ ImmutableList.of(dbClsDef, storageDescClsDef, columnClsDef, tblClsDef, loadProcessClsDef, viewClsDef, partClsDef));
+ }
+
+ AttributeDefinition attrDef(String name, IDataType dT) {
+ return attrDef(name, dT, Multiplicity.OPTIONAL, false, null);
+ }
+
+ AttributeDefinition attrDef(String name, IDataType dT, Multiplicity m) {
+ return attrDef(name, dT, m, false, null);
+ }
+
+ AttributeDefinition attrDef(String name, IDataType dT, Multiplicity m, boolean isComposite,
+ String reverseAttributeName) {
+ Preconditions.checkNotNull(name);
+ Preconditions.checkNotNull(dT);
+ return new AttributeDefinition(name, dT.getName(), m, isComposite, reverseAttributeName);
+ }
+
+ private void setupInstances() throws Exception {
+ Id salesDB = database("Sales", "Sales Database", "John ETL", "hdfs://host:8000/apps/warehouse/sales");
+
+ Referenceable sd =
+ storageDescriptor("hdfs://host:8000/apps/warehouse/sales", "TextInputFormat", "TextOutputFormat", true, ImmutableList.of(
+ column("time_id", "int", "time id")));
+
+ List<Referenceable> salesFactColumns = ImmutableList
+ .of(column("time_id", "int", "time id"),
+ column("product_id", "int", "product id"),
+ column("customer_id", "int", "customer id", "PII"),
+ column("sales", "double", "product id", "Metric"));
+
+ Id salesFact = table("sales_fact", "sales fact table", salesDB, sd, "Joe", "Managed", salesFactColumns, "Fact");
+
+ List<Referenceable> logFactColumns = ImmutableList
+ .of(column("time_id", "int", "time id"), column("app_id", "int", "app id"),
+ column("machine_id", "int", "machine id"), column("log", "string", "log data", "Log Data"));
+
+ List<Referenceable> timeDimColumns = ImmutableList
+ .of(column("time_id", "int", "time id"),
+ column("dayOfYear", "int", "day Of Year"),
+ column("weekDay", "int", "week Day"));
+
+ Id timeDim = table("time_dim", "time dimension table", salesDB, sd, "John Doe", "External", timeDimColumns,
+ "Dimension");
+
+ Id reportingDB =
+ database("Reporting", "reporting database", "Jane BI", "hdfs://host:8000/apps/warehouse/reporting");
+
+ Id salesFactDaily =
+ table("sales_fact_daily_mv", "sales fact daily materialized view", reportingDB, sd, "Joe BI", "Managed",
+ salesFactColumns, "Metric");
+
+ loadProcess("loadSalesDaily", "hive query for daily summary", "John ETL", ImmutableList.of(salesFact, timeDim),
+ ImmutableList.of(salesFactDaily), "create table as select ", "plan", "id", "graph", "ETL");
+
+ Id logDB = database("Logging", "logging database", "Tim ETL", "hdfs://host:8000/apps/warehouse/logging");
+
+ Id loggingFactDaily =
+ table("log_fact_daily_mv", "log fact daily materialized view", logDB, sd, "Tim ETL", "Managed",
+ logFactColumns, "Log Data");
+
+ List<Referenceable> productDimColumns = ImmutableList
+ .of(column("product_id", "int", "product id"),
+ column("product_name", "string", "product name"),
+ column("brand_name", "int", "brand name"));
+
+ Id productDim =
+ table("product_dim", "product dimension table", salesDB, sd, "John Doe", "Managed", productDimColumns,
+ "Dimension");
+
+ view("product_dim_view", reportingDB, ImmutableList.of(productDim), "Dimension", "JdbcAccess");
+
+ List<Referenceable> customerDimColumns = ImmutableList.of(
+ column("customer_id", "int", "customer id", "PII"),
+ column("name", "string", "customer name", "PII"),
+ column("address", "string", "customer address", "PII"));
+
+ Id customerDim =
+ table("customer_dim", "customer dimension table", salesDB, sd, "fetl", "External", customerDimColumns,
+ "Dimension");
+
+ view("customer_dim_view", reportingDB, ImmutableList.of(customerDim), "Dimension", "JdbcAccess");
+
+ Id salesFactMonthly =
+ table("sales_fact_monthly_mv", "sales fact monthly materialized view", reportingDB, sd, "Jane BI",
+ "Managed", salesFactColumns, "Metric");
+
+ loadProcess("loadSalesMonthly", "hive query for monthly summary", "John ETL", ImmutableList.of(salesFactDaily),
+ ImmutableList.of(salesFactMonthly), "create table as select ", "plan", "id", "graph", "ETL");
+
+ Id loggingFactMonthly =
+ table("logging_fact_monthly_mv", "logging fact monthly materialized view", logDB, sd, "Tim ETL",
+ "Managed", logFactColumns, "Log Data");
+
+ loadProcess("loadLogsMonthly", "hive query for monthly summary", "Tim ETL", ImmutableList.of(loggingFactDaily),
+ ImmutableList.of(loggingFactMonthly), "create table as select ", "plan", "id", "graph", "ETL");
+
+ partition(new ArrayList() {{ add("2015-01-01"); }}, salesFactDaily);
+ }
+
+ Id database(String name, String description, String owner, String locationUri, String... traitNames)
+ throws Exception {
+ Referenceable referenceable = new Referenceable(DATABASE_TYPE, traitNames);
+ referenceable.set("name", name);
+ referenceable.set("description", description);
+ referenceable.set("owner", owner);
+ referenceable.set("locationUri", locationUri);
+ referenceable.set("createTime", System.currentTimeMillis());
+
+ ClassType clsType = TypeSystem.getInstance().getDataType(ClassType.class, DATABASE_TYPE);
+ return createInstance(referenceable, clsType);
+ }
+
+ protected Referenceable storageDescriptor(String location, String inputFormat, String outputFormat, boolean compressed, List<Referenceable> columns)
+ throws Exception {
+ Referenceable referenceable = new Referenceable(STORAGE_DESC_TYPE);
+ referenceable.set("location", location);
+ referenceable.set("inputFormat", inputFormat);
+ referenceable.set("outputFormat", outputFormat);
+ referenceable.set("compressed", compressed);
+ referenceable.set("cols", columns);
+
+ return referenceable;
+ }
+
+ protected Referenceable column(String name, String dataType, String comment, String... traitNames) throws Exception {
+ Referenceable referenceable = new Referenceable(COLUMN_TYPE, traitNames);
+ referenceable.set("name", name);
+ referenceable.set("dataType", dataType);
+ referenceable.set("comment", comment);
+
+ return referenceable;
+ }
+
+ protected Id table(String name, String description, Id dbId, Referenceable sd, String owner, String tableType,
+ List<Referenceable> columns, String... traitNames) throws Exception {
+ Referenceable referenceable = new Referenceable(HIVE_TABLE_TYPE, traitNames);
+ referenceable.set("name", name);
+ referenceable.set("description", description);
+ referenceable.set("owner", owner);
+ referenceable.set("tableType", tableType);
+ referenceable.set("temporary", false);
+ referenceable.set("createTime", new Date(System.currentTimeMillis()));
+ referenceable.set("lastAccessTime", System.currentTimeMillis());
+ referenceable.set("retention", System.currentTimeMillis());
+
+ referenceable.set("db", dbId);
+ // todo - uncomment this, something is broken
+ referenceable.set("sd", sd);
+ referenceable.set("columns", columns);
+
+ ClassType clsType = TypeSystem.getInstance().getDataType(ClassType.class, HIVE_TABLE_TYPE);
+ return createInstance(referenceable, clsType);
+ }
+
+ protected Id loadProcess(String name, String description, String user, List<Id> inputTables, List<Id> outputTables,
+ String queryText, String queryPlan, String queryId, String queryGraph, String... traitNames)
+ throws Exception {
+ Referenceable referenceable = new Referenceable(HIVE_PROCESS_TYPE, traitNames);
+ referenceable.set("name", name);
+ referenceable.set("qualifiedName", name);
+ referenceable.set("description", description);
+ referenceable.set("user", user);
+ referenceable.set("startTime", System.currentTimeMillis());
+ referenceable.set("endTime", System.currentTimeMillis() + 10000);
+
+ referenceable.set("inputs", inputTables);
+ referenceable.set("outputs", outputTables);
+
+ referenceable.set("queryText", queryText);
+ referenceable.set("queryPlan", queryPlan);
+ referenceable.set("queryId", queryId);
+ referenceable.set("queryGraph", queryGraph);
+
+ ClassType clsType = TypeSystem.getInstance().getDataType(ClassType.class, HIVE_PROCESS_TYPE);
+ return createInstance(referenceable, clsType);
+ }
+
+ Id view(String name, Id dbId, List<Id> inputTables, String... traitNames) throws Exception {
+ Referenceable referenceable = new Referenceable(VIEW_TYPE, traitNames);
+ referenceable.set("name", name);
+ referenceable.set("db", dbId);
+
+ referenceable.set("inputTables", inputTables);
+ ClassType clsType = TypeSystem.getInstance().getDataType(ClassType.class, VIEW_TYPE);
+ return createInstance(referenceable, clsType);
+ }
+
+ Id partition(List<String> values, Id table, String... traitNames) throws Exception {
+ Referenceable referenceable = new Referenceable(PARTITION_TYPE, traitNames);
+ referenceable.set("values", values);
+ referenceable.set("table", table);
+ ClassType clsType = TypeSystem.getInstance().getDataType(ClassType.class, PARTITION_TYPE);
+ return createInstance(referenceable, clsType);
+ }
+ private Id createInstance(Referenceable referenceable, ClassType clsType) throws Exception {
+ ITypedReferenceableInstance typedInstance = clsType.convert(referenceable, Multiplicity.REQUIRED);
+ List<String> guids = repository.createEntities(typedInstance);
+
+ // return the reference to created instance with guid
+ return new Id(guids.get(guids.size() - 1), 0, referenceable.getTypeName());
+ }
+}