You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@atlas.apache.org by sh...@apache.org on 2016/01/29 07:05:35 UTC
incubator-atlas git commit: ATLAS-415 Hive import fails when
importing a table that is already imported without StorageDescriptor
information (yhemanth via shwethags)
Repository: incubator-atlas
Updated Branches:
refs/heads/master 7b07a222c -> 8c4a7faef
ATLAS-415 Hive import fails when importing a table that is already imported without StorageDescriptor information (yhemanth via shwethags)
Project: http://git-wip-us.apache.org/repos/asf/incubator-atlas/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-atlas/commit/8c4a7fae
Tree: http://git-wip-us.apache.org/repos/asf/incubator-atlas/tree/8c4a7fae
Diff: http://git-wip-us.apache.org/repos/asf/incubator-atlas/diff/8c4a7fae
Branch: refs/heads/master
Commit: 8c4a7faef286a5ae5cf6f986f6796da60f0251ab
Parents: 7b07a22
Author: Shwetha GS <ss...@hortonworks.com>
Authored: Fri Jan 29 11:35:29 2016 +0530
Committer: Shwetha GS <ss...@hortonworks.com>
Committed: Fri Jan 29 11:35:29 2016 +0530
----------------------------------------------------------------------
addons/hive-bridge/pom.xml | 5 +
.../atlas/hive/bridge/HiveMetaStoreBridge.java | 243 +++++++++++++------
.../hive/bridge/HiveMetaStoreBridgeTest.java | 206 ++++++++++++++++
pom.xml | 2 +-
release-log.txt | 1 +
5 files changed, 388 insertions(+), 69 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/8c4a7fae/addons/hive-bridge/pom.xml
----------------------------------------------------------------------
diff --git a/addons/hive-bridge/pom.xml b/addons/hive-bridge/pom.xml
index 20c25d1..f1cb130 100755
--- a/addons/hive-bridge/pom.xml
+++ b/addons/hive-bridge/pom.xml
@@ -127,6 +127,11 @@
</dependency>
<dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-all</artifactId>
+ </dependency>
+
+ <dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-server</artifactId>
<scope>test</scope>
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/8c4a7fae/addons/hive-bridge/src/main/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridge.java
----------------------------------------------------------------------
diff --git a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridge.java b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridge.java
index 40babe5..4680a3c 100755
--- a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridge.java
+++ b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridge.java
@@ -52,12 +52,16 @@ import java.util.List;
/**
* A Bridge Utility that imports metadata from the Hive Meta Store
- * and registers then in Atlas.
+ * and registers them in Atlas.
*/
public class HiveMetaStoreBridge {
private static final String DEFAULT_DGI_URL = "http://localhost:21000/";
public static final String HIVE_CLUSTER_NAME = "atlas.cluster.name";
public static final String DEFAULT_CLUSTER_NAME = "primary";
+ public static final String DESCRIPTION_ATTR = "description";
+ public static final String TABLE_TYPE_ATTR = "tableType";
+ public static final String SEARCH_ENTRY_GUID_ATTR = "__guid";
+ public static final String LAST_ACCESS_TIME_ATTR = "lastAccessTime";
private final String clusterName;
public static final String ATLAS_ENDPOINT = "atlas.rest.address";
@@ -67,6 +71,12 @@ public class HiveMetaStoreBridge {
public final Hive hiveClient;
private final AtlasClient atlasClient;
+ /**
+ * Construct a HiveMetaStoreBridge.
+ * @param hiveConf {@link HiveConf} for Hive component in the cluster
+ * @param atlasConf {@link Configuration} for Atlas component in the cluster
+ * @throws Exception
+ */
public HiveMetaStoreBridge(HiveConf hiveConf, Configuration atlasConf) throws Exception {
this(hiveConf, atlasConf, null, null);
}
@@ -77,21 +87,28 @@ public class HiveMetaStoreBridge {
/**
* Construct a HiveMetaStoreBridge.
- * @param hiveConf hive conf
+ * @param hiveConf {@link HiveConf} for Hive component in the cluster
+ * @param doAsUser The user accessing Atlas service
+ * @param ugi {@link UserGroupInformation} representing the Atlas service
*/
public HiveMetaStoreBridge(HiveConf hiveConf, Configuration atlasConf, String doAsUser,
UserGroupInformation ugi) throws Exception {
- clusterName = hiveConf.get(HIVE_CLUSTER_NAME, DEFAULT_CLUSTER_NAME);
- hiveClient = Hive.get(hiveConf);
+ this(hiveConf.get(HIVE_CLUSTER_NAME, DEFAULT_CLUSTER_NAME),
+ Hive.get(hiveConf),
+ new AtlasClient(atlasConf.getString(ATLAS_ENDPOINT, DEFAULT_DGI_URL), ugi, doAsUser));
+ }
- atlasClient = new AtlasClient(atlasConf.getString(ATLAS_ENDPOINT, DEFAULT_DGI_URL), ugi, doAsUser);
+ HiveMetaStoreBridge(String clusterName, Hive hiveClient, AtlasClient atlasClient) {
+ this.clusterName = clusterName;
+ this.hiveClient = hiveClient;
+ this.atlasClient = atlasClient;
}
- public AtlasClient getAtlasClient() {
+ private AtlasClient getAtlasClient() {
return atlasClient;
}
- public void importHiveMetadata() throws Exception {
+ void importHiveMetadata() throws Exception {
LOG.info("Importing hive metadata");
importDatabases();
}
@@ -106,27 +123,13 @@ public class HiveMetaStoreBridge {
}
/**
- * Creates db entity
- * @param hiveDB
- * @return
+ * Create a Hive Database entity
+ * @param hiveDB The Hive {@link Database} object from which to map properties
+ * @return new Hive Database entity
* @throws HiveException
*/
public Referenceable createDBInstance(Database hiveDB) throws HiveException {
- LOG.info("Importing objects from databaseName : " + hiveDB.getName());
-
- Referenceable dbRef = new Referenceable(HiveDataTypes.HIVE_DB.getName());
- String dbName = hiveDB.getName().toLowerCase();
- dbRef.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, getDBQualifiedName(clusterName, dbName));
- dbRef.set(HiveDataModelGenerator.NAME, dbName);
- dbRef.set(HiveDataModelGenerator.CLUSTER_NAME, clusterName);
- dbRef.set("description", hiveDB.getDescription());
- dbRef.set("locationUri", hiveDB.getLocationUri());
- dbRef.set("parameters", hiveDB.getParameters());
- dbRef.set("ownerName", hiveDB.getOwnerName());
- if (hiveDB.getOwnerType() != null) {
- dbRef.set("ownerType", hiveDB.getOwnerType().getValue());
- }
- return dbRef;
+ return createOrUpdateDBInstance(hiveDB, null);
}
/**
@@ -137,12 +140,34 @@ public class HiveMetaStoreBridge {
*/
private Referenceable registerDatabase(String databaseName) throws Exception {
Referenceable dbRef = getDatabaseReference(clusterName, databaseName);
+ Database db = hiveClient.getDatabase(databaseName);
if (dbRef == null) {
- Database db = hiveClient.getDatabase(databaseName);
dbRef = createDBInstance(db);
dbRef = registerInstance(dbRef);
} else {
- LOG.info("Database {} is already registered with id {}", databaseName, dbRef.getId().id);
+ LOG.info("Database {} is already registered with id {}. Updating it.", databaseName, dbRef.getId().id);
+ dbRef = createOrUpdateDBInstance(db, dbRef);
+ updateInstance(dbRef);
+ }
+ return dbRef;
+ }
+
+ private Referenceable createOrUpdateDBInstance(Database hiveDB, Referenceable dbRef) {
+ LOG.info("Importing objects from databaseName : " + hiveDB.getName());
+
+ if (dbRef == null) {
+ dbRef = new Referenceable(HiveDataTypes.HIVE_DB.getName());
+ }
+ String dbName = hiveDB.getName().toLowerCase();
+ dbRef.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, getDBQualifiedName(clusterName, dbName));
+ dbRef.set(HiveDataModelGenerator.NAME, dbName);
+ dbRef.set(HiveDataModelGenerator.CLUSTER_NAME, clusterName);
+ dbRef.set(DESCRIPTION_ATTR, hiveDB.getDescription());
+ dbRef.set("locationUri", hiveDB.getLocationUri());
+ dbRef.set("parameters", hiveDB.getParameters());
+ dbRef.set("ownerName", hiveDB.getOwnerName());
+ if (hiveDB.getOwnerType() != null) {
+ dbRef.set("ownerType", hiveDB.getOwnerType().getValue());
}
return dbRef;
}
@@ -153,7 +178,7 @@ public class HiveMetaStoreBridge {
* @return
* @throws Exception
*/
- public Referenceable registerInstance(Referenceable referenceable) throws Exception {
+ private Referenceable registerInstance(Referenceable referenceable) throws Exception {
String typeName = referenceable.getTypeName();
LOG.debug("creating instance of type " + typeName);
@@ -176,11 +201,15 @@ public class HiveMetaStoreBridge {
LOG.debug("Getting reference for database {}", databaseName);
String typeName = HiveDataTypes.HIVE_DB.getName();
- String dslQuery = String.format("%s where %s = '%s' and %s = '%s'", typeName, HiveDataModelGenerator.NAME,
- databaseName.toLowerCase(), HiveDataModelGenerator.CLUSTER_NAME, clusterName);
+ String dslQuery = getDatabaseDSLQuery(clusterName, databaseName, typeName);
return getEntityReferenceFromDSL(typeName, dslQuery);
}
+ static String getDatabaseDSLQuery(String clusterName, String databaseName, String typeName) {
+ return String.format("%s where %s = '%s' and %s = '%s'", typeName, HiveDataModelGenerator.NAME,
+ databaseName.toLowerCase(), HiveDataModelGenerator.CLUSTER_NAME, clusterName);
+ }
+
private Referenceable getEntityReferenceFromDSL(String typeName, String dslQuery) throws Exception {
AtlasClient dgiClient = getAtlasClient();
JSONArray results = dgiClient.searchByDSL(dslQuery);
@@ -198,6 +227,12 @@ public class HiveMetaStoreBridge {
}
}
+ /**
+ * Construct the qualified name used to uniquely identify a Database instance in Atlas.
+ * @param clusterName Name of the cluster to which the Hive component belongs
+ * @param dbName Name of the Hive database
+ * @return Unique qualified name to identify the Database instance in Atlas.
+ */
public static String getDBQualifiedName(String clusterName, String dbName) {
return String.format("%s@%s", dbName.toLowerCase(), clusterName);
}
@@ -233,71 +268,109 @@ public class HiveMetaStoreBridge {
LOG.debug("Getting reference for table {}.{}", dbName, tableName);
String typeName = HiveDataTypes.HIVE_TABLE.getName();
- String entityName = getTableQualifiedName(clusterName, dbName, tableName);
- String dslQuery = String.format("%s as t where name = '%s'", typeName, entityName);
+ String dslQuery = getTableDSLQuery(getClusterName(), dbName, tableName, typeName);
return getEntityReferenceFromDSL(typeName, dslQuery);
}
+ static String getTableDSLQuery(String clusterName, String dbName, String tableName, String typeName) {
+ String entityName = getTableQualifiedName(clusterName, dbName, tableName);
+ return String.format("%s as t where name = '%s'", typeName, entityName);
+ }
+
+ /**
+ * Construct the qualified name used to uniquely identify a Table instance in Atlas.
+ * @param clusterName Name of the cluster to which the Hive component belongs
+ * @param dbName Name of the Hive database to which the Table belongs
+ * @param tableName Name of the Hive table
+ * @return Unique qualified name to identify the Table instance in Atlas.
+ */
public static String getTableQualifiedName(String clusterName, String dbName, String tableName) {
return String.format("%s.%s@%s", dbName.toLowerCase(), tableName.toLowerCase(), clusterName);
}
+ /**
+ * Create a new table instance in Atlas
+ * @param dbReference reference to a created Hive database {@link Referenceable} to which this table belongs
+ * @param hiveTable reference to the Hive {@link Table} from which to map properties
+ * @return Newly created Hive reference
+ * @throws Exception
+ */
public Referenceable createTableInstance(Referenceable dbReference, Table hiveTable)
throws Exception {
+ return createOrUpdateTableInstance(dbReference, null, hiveTable);
+ }
+
+ private Referenceable createOrUpdateTableInstance(Referenceable dbReference, Referenceable tableReference,
+ Table hiveTable) throws Exception {
LOG.info("Importing objects from {}.{}", hiveTable.getDbName(), hiveTable.getTableName());
- Referenceable tableRef = new Referenceable(HiveDataTypes.HIVE_TABLE.getName());
+ if (tableReference == null) {
+ tableReference = new Referenceable(HiveDataTypes.HIVE_TABLE.getName());
+ }
String tableQualifiedName = getTableQualifiedName(clusterName, hiveTable.getDbName(), hiveTable.getTableName());
- tableRef.set(HiveDataModelGenerator.NAME, tableQualifiedName);
- tableRef.set(HiveDataModelGenerator.TABLE_NAME, hiveTable.getTableName().toLowerCase());
- tableRef.set("owner", hiveTable.getOwner());
+ tableReference.set(HiveDataModelGenerator.NAME, tableQualifiedName);
+ tableReference.set(HiveDataModelGenerator.TABLE_NAME, hiveTable.getTableName().toLowerCase());
+ tableReference.set("owner", hiveTable.getOwner());
- tableRef.set("createTime", hiveTable.getMetadata().getProperty(hive_metastoreConstants.DDL_TIME));
- tableRef.set("lastAccessTime", hiveTable.getLastAccessTime());
- tableRef.set("retention", hiveTable.getRetention());
+ tableReference.set("createTime", hiveTable.getMetadata().getProperty(hive_metastoreConstants.DDL_TIME));
+ tableReference.set("lastAccessTime", hiveTable.getLastAccessTime());
+ tableReference.set("retention", hiveTable.getRetention());
- tableRef.set(HiveDataModelGenerator.COMMENT, hiveTable.getParameters().get(HiveDataModelGenerator.COMMENT));
+ tableReference.set(HiveDataModelGenerator.COMMENT, hiveTable.getParameters().get(HiveDataModelGenerator.COMMENT));
// add reference to the database
- tableRef.set(HiveDataModelGenerator.DB, dbReference);
+ tableReference.set(HiveDataModelGenerator.DB, dbReference);
- tableRef.set("columns", getColumns(hiveTable.getCols(), tableQualifiedName));
+ tableReference.set("columns", getColumns(hiveTable.getCols(), tableQualifiedName));
// add reference to the StorageDescriptor
Referenceable sdReferenceable = fillStorageDescStruct(hiveTable.getSd(), tableQualifiedName, tableQualifiedName);
- tableRef.set("sd", sdReferenceable);
+ tableReference.set("sd", sdReferenceable);
// add reference to the Partition Keys
List<Referenceable> partKeys = getColumns(hiveTable.getPartitionKeys(), tableQualifiedName);
- tableRef.set("partitionKeys", partKeys);
+ tableReference.set("partitionKeys", partKeys);
- tableRef.set("parameters", hiveTable.getParameters());
+ tableReference.set("parameters", hiveTable.getParameters());
if (hiveTable.getViewOriginalText() != null) {
- tableRef.set("viewOriginalText", hiveTable.getViewOriginalText());
+ tableReference.set("viewOriginalText", hiveTable.getViewOriginalText());
}
if (hiveTable.getViewExpandedText() != null) {
- tableRef.set("viewExpandedText", hiveTable.getViewExpandedText());
+ tableReference.set("viewExpandedText", hiveTable.getViewExpandedText());
}
- tableRef.set("tableType", hiveTable.getTableType().name());
- tableRef.set("temporary", hiveTable.isTemporary());
- return tableRef;
+ tableReference.set(TABLE_TYPE_ATTR, hiveTable.getTableType().name());
+ tableReference.set("temporary", hiveTable.isTemporary());
+ return tableReference;
}
private Referenceable registerTable(Referenceable dbReference, Table table) throws Exception {
String dbName = table.getDbName();
String tableName = table.getTableName();
LOG.info("Attempting to register table [" + tableName + "]");
- Referenceable tableRef = getTableReference(dbName, tableName);
- if (tableRef == null) {
- tableRef = createTableInstance(dbReference, table);
- tableRef = registerInstance(tableRef);
+ Referenceable tableReference = getTableReference(dbName, tableName);
+ if (tableReference == null) {
+ tableReference = createTableInstance(dbReference, table);
+ tableReference = registerInstance(tableReference);
} else {
- LOG.info("Table {}.{} is already registered with id {}", dbName, tableName, tableRef.getId().id);
+ LOG.info("Table {}.{} is already registered with id {}. Updating entity.", dbName, tableName,
+ tableReference.getId().id);
+ tableReference = createOrUpdateTableInstance(dbReference, tableReference, table);
+ updateInstance(tableReference);
}
- return tableRef;
+ return tableReference;
+ }
+
+ private void updateInstance(Referenceable referenceable) throws AtlasServiceException {
+ String typeName = referenceable.getTypeName();
+ LOG.debug("updating instance of type " + typeName);
+
+ String entityJSON = InstanceSerialization.toJson(referenceable, true);
+ LOG.debug("Updating entity {} = {}", referenceable.getTypeName(), entityJSON);
+
+ atlasClient.updateEntity(referenceable.getId().id, referenceable);
}
@@ -308,14 +381,13 @@ public class HiveMetaStoreBridge {
if (results.length() == 0) {
return null;
}
- String guid = results.getJSONObject(0).getString("__guid");
+ String guid = results.getJSONObject(0).getString(SEARCH_ENTRY_GUID_ATTR);
return new Referenceable(guid, typeName, null);
}
private Referenceable getPartitionReference(String dbName, String tableName, List<String> values) throws Exception {
- String valuesStr = "['" + StringUtils.join(values, "', '") + "']";
+ String valuesStr = joinPartitionValues(values);
LOG.debug("Getting reference for partition for {}.{} with values {}", dbName, tableName, valuesStr);
- String typeName = HiveDataTypes.HIVE_PARTITION.getName();
//todo replace gremlin with DSL
// String dslQuery = String.format("%s as p where values = %s, tableName where name = '%s', "
@@ -323,14 +395,23 @@ public class HiveMetaStoreBridge {
// tableName,
// dbName, clusterName);
- String datasetType = AtlasClient.DATA_SET_SUPER_TYPE;
String tableEntityName = getTableQualifiedName(clusterName, dbName, tableName);
- String gremlinQuery = String.format("g.V.has('__typeName', '%s').has('%s.values', %s).as('p')."
+ String gremlinQuery = getPartitionGremlinQuery(valuesStr, tableEntityName);
+
+ return getEntityReferenceFromGremlin(HiveDataTypes.HIVE_PARTITION.getName(), gremlinQuery);
+ }
+
+ static String joinPartitionValues(List<String> values) {
+ return "['" + StringUtils.join(values, "', '") + "']";
+ }
+
+ static String getPartitionGremlinQuery(String valuesStr, String tableEntityName) {
+ String typeName = HiveDataTypes.HIVE_PARTITION.getName();
+ String datasetType = AtlasClient.DATA_SET_SUPER_TYPE;
+ return String.format("g.V.has('__typeName', '%s').has('%s.values', %s).as('p')."
+ "out('__%s.table').has('%s.name', '%s').back('p').toList()", typeName, typeName, valuesStr,
typeName, datasetType, tableEntityName);
-
- return getEntityReferenceFromGremlin(typeName, gremlinQuery);
}
private Referenceable getSDForTable(String dbName, String tableName) throws Exception {
@@ -369,15 +450,22 @@ public class HiveMetaStoreBridge {
partRef = createPartitionReferenceable(tableReferenceable, sdReferenceable, hivePart);
partRef = registerInstance(partRef);
} else {
- LOG.info("Partition {}.{} with values {} is already registered with id {}", dbName, tableName,
+ LOG.info("Partition {}.{} with values {} is already registered with id {}. Updating entity",
+ dbName, tableName,
StringUtils.join(hivePart.getValues(), ","), partRef.getId().id);
+ partRef =
+ createOrUpdatePartitionReferenceable(tableReferenceable, sdReferenceable, hivePart, partRef);
+ updateInstance(partRef);
}
return partRef;
}
- public Referenceable createPartitionReferenceable(Referenceable tableReferenceable, Referenceable sdReferenceable,
- Partition hivePart) {
- Referenceable partRef = new Referenceable(HiveDataTypes.HIVE_PARTITION.getName());
+ private Referenceable createOrUpdatePartitionReferenceable(Referenceable tableReferenceable,
+ Referenceable sdReferenceable,
+ Partition hivePart, Referenceable partRef) {
+ if (partRef == null) {
+ partRef = new Referenceable(HiveDataTypes.HIVE_PARTITION.getName());
+ }
partRef.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, getPartitionQualifiedName(hivePart));
partRef.set("values", hivePart.getValues());
@@ -385,7 +473,7 @@ public class HiveMetaStoreBridge {
//todo fix
partRef.set("createTime", hivePart.getLastAccessTime());
- partRef.set("lastAccessTime", hivePart.getLastAccessTime());
+ partRef.set(LAST_ACCESS_TIME_ATTR, hivePart.getLastAccessTime());
// sdStruct = fillStorageDescStruct(hivePart.getSd());
// Instead of creating copies of the sdstruct for partitions we are reusing existing
@@ -396,6 +484,18 @@ public class HiveMetaStoreBridge {
return partRef;
}
+ /**
+ * Create a Hive partition instance in Atlas
+ * @param tableReferenceable The Hive Table {@link Referenceable} to which this partition belongs.
+ * @param sdReferenceable The Storage descriptor {@link Referenceable} for this table.
+ * @param hivePart The Hive {@link Partition} object being created
+ * @return Newly created Hive partition instance
+ */
+ public Referenceable createPartitionReferenceable(Referenceable tableReferenceable, Referenceable sdReferenceable,
+ Partition hivePart) {
+ return createOrUpdatePartitionReferenceable(tableReferenceable, sdReferenceable, hivePart, null);
+ }
+
private String getPartitionQualifiedName(Partition partition) {
return String.format("%s.%s.%s@%s", partition.getTable().getDbName(),
partition.getTable().getTableName(), StringUtils.join(partition.getValues(), "-"), clusterName);
@@ -480,6 +580,13 @@ public class HiveMetaStoreBridge {
return colList;
}
+ /**
+ * Register the Hive DataModel in Atlas, if not already defined.
+ *
+ * The method checks for the presence of the type {@link HiveDataTypes#HIVE_PROCESS} with the Atlas server.
+ * If this type is defined, then we assume the Hive DataModel is registered.
+ * @throws Exception
+ */
public synchronized void registerHiveDataModel() throws Exception {
HiveDataModelGenerator dataModelGenerator = new HiveDataModelGenerator();
AtlasClient dgiClient = getAtlasClient();
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/8c4a7fae/addons/hive-bridge/src/test/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridgeTest.java
----------------------------------------------------------------------
diff --git a/addons/hive-bridge/src/test/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridgeTest.java b/addons/hive-bridge/src/test/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridgeTest.java
new file mode 100644
index 0000000..f8cfb71
--- /dev/null
+++ b/addons/hive-bridge/src/test/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridgeTest.java
@@ -0,0 +1,206 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.hive.bridge;
+
+import org.apache.atlas.AtlasClient;
+import org.apache.atlas.AtlasServiceException;
+import org.apache.atlas.hive.model.HiveDataTypes;
+import org.apache.atlas.typesystem.Referenceable;
+import org.apache.hadoop.hive.metastore.TableType;
+import org.apache.hadoop.hive.metastore.api.Database;
+import org.apache.hadoop.hive.ql.metadata.Hive;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.metadata.Partition;
+import org.apache.hadoop.hive.ql.metadata.Table;
+import org.apache.hadoop.mapred.TextInputFormat;
+import org.codehaus.jettison.json.JSONArray;
+import org.codehaus.jettison.json.JSONException;
+import org.codehaus.jettison.json.JSONObject;
+import org.mockito.ArgumentMatcher;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+import scala.actors.threadpool.Arrays;
+
+import java.util.List;
+
+import static org.mockito.Mockito.argThat;
+import static org.mockito.Mockito.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class HiveMetaStoreBridgeTest {
+
+ private static final String TEST_DB_NAME = "default";
+ public static final String CLUSTER_NAME = "primary";
+ public static final String TEST_TABLE_NAME = "test_table";
+
+ @Mock
+ private Hive hiveClient;
+
+ @Mock
+ private AtlasClient atlasClient;
+
+ @BeforeMethod
+ public void initializeMocks() {
+ MockitoAnnotations.initMocks(this);
+ }
+
+ @Test
+ public void testImportThatUpdatesRegisteredDatabase() throws Exception {
+ // setup database
+ when(hiveClient.getAllDatabases()).thenReturn(Arrays.asList(new String[]{TEST_DB_NAME}));
+ String description = "This is a default database";
+ when(hiveClient.getDatabase(TEST_DB_NAME)).thenReturn(
+ new Database(TEST_DB_NAME, description, "/user/hive/default", null));
+ when(hiveClient.getAllTables(TEST_DB_NAME)).thenReturn(Arrays.asList(new String[]{}));
+
+ returnExistingDatabase(TEST_DB_NAME, atlasClient, CLUSTER_NAME);
+
+ HiveMetaStoreBridge bridge = new HiveMetaStoreBridge(CLUSTER_NAME, hiveClient, atlasClient);
+ bridge.importHiveMetadata();
+
+ // verify update is called
+ verify(atlasClient).updateEntity(eq("72e06b34-9151-4023-aa9d-b82103a50e76"),
+ (Referenceable) argThat(
+ new MatchesReferenceableProperty(HiveMetaStoreBridge.DESCRIPTION_ATTR, description)));
+ }
+
+ @Test
+ public void testImportThatUpdatesRegisteredTable() throws Exception {
+ setupDB(hiveClient, TEST_DB_NAME);
+
+ setupTable(hiveClient, TEST_DB_NAME, TEST_TABLE_NAME);
+
+ returnExistingDatabase(TEST_DB_NAME, atlasClient, CLUSTER_NAME);
+
+ // return existing table
+ when(atlasClient.searchByDSL(HiveMetaStoreBridge.getTableDSLQuery(CLUSTER_NAME, TEST_DB_NAME, TEST_TABLE_NAME,
+ HiveDataTypes.HIVE_TABLE.getName()))).thenReturn(
+ getEntityReference("82e06b34-9151-4023-aa9d-b82103a50e77"));
+ when(atlasClient.getEntity("82e06b34-9151-4023-aa9d-b82103a50e77")).thenReturn(createTableReference());
+
+ HiveMetaStoreBridge bridge = new HiveMetaStoreBridge(CLUSTER_NAME, hiveClient, atlasClient);
+ bridge.importHiveMetadata();
+
+ // verify update is called on table
+ verify(atlasClient).updateEntity(eq("82e06b34-9151-4023-aa9d-b82103a50e77"),
+ (Referenceable) argThat(new MatchesReferenceableProperty(HiveMetaStoreBridge.TABLE_TYPE_ATTR,
+ TableType.EXTERNAL_TABLE.name())));
+ }
+
+ private void returnExistingDatabase(String databaseName, AtlasClient atlasClient, String clusterName)
+ throws AtlasServiceException, JSONException {
+ when(atlasClient.searchByDSL(HiveMetaStoreBridge.getDatabaseDSLQuery(clusterName, databaseName,
+ HiveDataTypes.HIVE_DB.getName()))).thenReturn(
+ getEntityReference("72e06b34-9151-4023-aa9d-b82103a50e76"));
+ }
+
+ private Table setupTable(Hive hiveClient, String databaseName, String tableName) throws HiveException {
+ when(hiveClient.getAllTables(databaseName)).thenReturn(Arrays.asList(new String[]{tableName}));
+ Table testTable = createTestTable(databaseName, tableName);
+ when(hiveClient.getTable(databaseName, tableName)).thenReturn(testTable);
+ return testTable;
+ }
+
+ private void setupDB(Hive hiveClient, String databaseName) throws HiveException {
+ when(hiveClient.getAllDatabases()).thenReturn(Arrays.asList(new String[]{databaseName}));
+ when(hiveClient.getDatabase(databaseName)).thenReturn(
+ new Database(databaseName, "Default database", "/user/hive/default", null));
+ }
+
+ @Test
+ public void testImportThatUpdatesRegisteredPartition() throws Exception {
+ setupDB(hiveClient, TEST_DB_NAME);
+ Table hiveTable = setupTable(hiveClient, TEST_DB_NAME, TEST_TABLE_NAME);
+
+ returnExistingDatabase(TEST_DB_NAME, atlasClient, CLUSTER_NAME);
+
+ when(atlasClient.searchByDSL(HiveMetaStoreBridge.getTableDSLQuery(CLUSTER_NAME, TEST_DB_NAME,
+ TEST_TABLE_NAME,
+ HiveDataTypes.HIVE_TABLE.getName()))).thenReturn(
+ getEntityReference("82e06b34-9151-4023-aa9d-b82103a50e77"));
+ when(atlasClient.getEntity("82e06b34-9151-4023-aa9d-b82103a50e77")).thenReturn(createTableReference());
+
+ Partition partition = mock(Partition.class);
+ when(partition.getTable()).thenReturn(hiveTable);
+ List partitionValues = Arrays.asList(new String[]{"name", "location"});
+ when(partition.getValues()).thenReturn(partitionValues);
+ int lastAccessTime = 1234512345;
+ when(partition.getLastAccessTime()).thenReturn(lastAccessTime);
+
+ when(hiveClient.getPartitions(hiveTable)).thenReturn(Arrays.asList(new Partition[]{partition}));
+
+ when(atlasClient.searchByGremlin(
+ HiveMetaStoreBridge.getPartitionGremlinQuery(
+ HiveMetaStoreBridge.joinPartitionValues(partitionValues),
+ HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, TEST_DB_NAME, TEST_TABLE_NAME)))).
+ thenReturn(getPartitionReference("9ae06b34-9151-3043-aa9d-b82103a50e99"));
+
+ HiveMetaStoreBridge bridge = new HiveMetaStoreBridge(CLUSTER_NAME, hiveClient, atlasClient);
+ bridge.importHiveMetadata();
+
+ verify(atlasClient).updateEntity(eq("9ae06b34-9151-3043-aa9d-b82103a50e99"),
+ (Referenceable) argThat(new MatchesReferenceableProperty(HiveMetaStoreBridge.LAST_ACCESS_TIME_ATTR,
+ new Integer(lastAccessTime))));
+ }
+
+ private JSONArray getPartitionReference(String id) throws JSONException {
+ JSONObject resultEntry = new JSONObject();
+ resultEntry.put(HiveMetaStoreBridge.SEARCH_ENTRY_GUID_ATTR, id);
+ JSONArray results = new JSONArray();
+ results.put(resultEntry);
+ return results;
+ }
+
+ private JSONArray getEntityReference(String id) throws JSONException {
+ return new JSONArray(String.format("[{\"$id$\":{\"id\":\"%s\"}}]", id));
+ }
+
+ private Referenceable createTableReference() {
+ Referenceable tableReference = new Referenceable(HiveDataTypes.HIVE_TABLE.getName());
+ Referenceable sdReference = new Referenceable(HiveDataTypes.HIVE_STORAGEDESC.getName());
+ tableReference.set("sd", sdReference);
+ return tableReference;
+ }
+
+ private Table createTestTable(String databaseName, String tableName) throws HiveException {
+ Table table = new Table(databaseName, tableName);
+ table.setInputFormatClass(TextInputFormat.class);
+ table.setTableType(TableType.EXTERNAL_TABLE);
+ return table;
+ }
+
+ private class MatchesReferenceableProperty extends ArgumentMatcher<Object> {
+ private final String attrName;
+ private final Object attrValue;
+
+ public MatchesReferenceableProperty(String attrName, Object attrValue) {
+ this.attrName = attrName;
+ this.attrValue = attrValue;
+ }
+
+ @Override
+ public boolean matches(Object o) {
+ return attrValue.equals(((Referenceable) o).get(attrName));
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/8c4a7fae/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 7470e65..095a8dd 100755
--- a/pom.xml
+++ b/pom.xml
@@ -1171,7 +1171,7 @@
<groupId>org.mockito</groupId>
<artifactId>mockito-all</artifactId>
<version>1.8.5</version>
- <scope>provided</scope>
+ <scope>test</scope>
</dependency>
<dependency>
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/8c4a7fae/release-log.txt
----------------------------------------------------------------------
diff --git a/release-log.txt b/release-log.txt
index 82e4357..da1f744 100644
--- a/release-log.txt
+++ b/release-log.txt
@@ -7,6 +7,7 @@ ATLAS-409 Atlas will not import avro tables with schema read from a file (dosset
ATLAS-379 Create sqoop and falcon metadata addons (venkatnrangan,bvellanki,sowmyaramesh via shwethags)
ALL CHANGES:
+ATLAS-415 Hive import fails when importing a table that is already imported without StorageDescriptor information (yhemanth via shwethags)
ATLAS-450 quick_start fails on cygwin (dkantor via shwethags)
ATLAS-451 Doc: Fix few broken links due to Wiki words in Atlas documentation (ssainath via shwethags)
ATLAS-439 Investigate apache build failure - EntityJerseyResourceIT.testEntityDeduping (shwethags)