You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@atlas.apache.org by ma...@apache.org on 2018/03/23 22:58:46 UTC
atlas git commit: ATLAS-2462: Sqoop import for all tables throws NPE
for no table provided in command
Repository: atlas
Updated Branches:
refs/heads/branch-0.8 4e27bf086 -> 7adbf8ffd
ATLAS-2462: Sqoop import for all tables throws NPE for no table provided in command
Signed-off-by: Madhan Neethiraj <ma...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/atlas/repo
Commit: http://git-wip-us.apache.org/repos/asf/atlas/commit/7adbf8ff
Tree: http://git-wip-us.apache.org/repos/asf/atlas/tree/7adbf8ff
Diff: http://git-wip-us.apache.org/repos/asf/atlas/diff/7adbf8ff
Branch: refs/heads/branch-0.8
Commit: 7adbf8ffd27904577ecf694a9f861cdd46833069
Parents: 4e27bf0
Author: rdsolani <rd...@gmail.com>
Authored: Fri Mar 23 17:18:20 2018 +0530
Committer: Madhan Neethiraj <ma...@apache.org>
Committed: Fri Mar 23 15:43:46 2018 -0700
----------------------------------------------------------------------
.../org/apache/atlas/sqoop/hook/SqoopHook.java | 62 ++++++++++++--------
1 file changed, 39 insertions(+), 23 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/atlas/blob/7adbf8ff/addons/sqoop-bridge/src/main/java/org/apache/atlas/sqoop/hook/SqoopHook.java
----------------------------------------------------------------------
diff --git a/addons/sqoop-bridge/src/main/java/org/apache/atlas/sqoop/hook/SqoopHook.java b/addons/sqoop-bridge/src/main/java/org/apache/atlas/sqoop/hook/SqoopHook.java
index 9e43430..666ec13 100644
--- a/addons/sqoop-bridge/src/main/java/org/apache/atlas/sqoop/hook/SqoopHook.java
+++ b/addons/sqoop-bridge/src/main/java/org/apache/atlas/sqoop/hook/SqoopHook.java
@@ -28,7 +28,8 @@ import org.apache.atlas.hook.AtlasHook;
import org.apache.atlas.hook.AtlasHookException;
import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.model.instance.AtlasEntity.AtlasEntitiesWithExtInfo;
-import org.apache.atlas.notification.hook.HookNotification.EntityUpdateRequestV2;
+import org.apache.atlas.model.instance.AtlasObjectId;
+import org.apache.atlas.notification.hook.HookNotification.EntityCreateRequestV2;
import org.apache.atlas.notification.hook.HookNotification.HookNotificationMessage;
import org.apache.atlas.sqoop.model.SqoopDataTypes;
import org.apache.atlas.type.AtlasTypeUtil;
@@ -39,12 +40,12 @@ import org.apache.sqoop.util.ImportException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.Arrays;
-import java.util.Date;
-import java.util.HashMap;
+import java.util.Collections;
import java.util.Map;
+import java.util.HashMap;
import java.util.Properties;
-
+import java.util.List;
+import java.util.Date;
/**
* AtlasHook sends lineage information to the AtlasSever.
*/
@@ -78,26 +79,30 @@ public class SqoopHook extends SqoopJobDataPublisher {
try {
Configuration atlasProperties = ApplicationProperties.get();
String clusterName = atlasProperties.getString(ATLAS_CLUSTER_NAME, DEFAULT_CLUSTER_NAME);
- AtlasEntity entDbStore = createDBStoreInstance(data);
- AtlasEntity entHiveDb = createHiveDatabaseInstance(clusterName, data.getHiveDB());
- AtlasEntity entHiveTable = createHiveTableInstance(entHiveDb, data.getHiveTable());
- AtlasEntity entProcess = createSqoopProcessInstance(entDbStore, entHiveTable, data, clusterName);
+ AtlasEntity entDbStore = toSqoopDBStoreEntity(data);
+ AtlasEntity entHiveDb = toHiveDatabaseEntity(clusterName, data.getHiveDB());
+ AtlasEntity entHiveTable = data.getHiveTable() != null ? toHiveTableEntity(entHiveDb, data.getHiveTable()) : null;
+ AtlasEntity entProcess = toSqoopProcessEntity(entDbStore, entHiveDb, entHiveTable, data, clusterName);
AtlasEntitiesWithExtInfo entities = new AtlasEntitiesWithExtInfo(entProcess);
entities.addReferredEntity(entDbStore);
entities.addReferredEntity(entHiveDb);
- entities.addReferredEntity(entHiveTable);
+ if (entHiveTable != null) {
+ entities.addReferredEntity(entHiveTable);
+ }
- HookNotificationMessage message = new EntityUpdateRequestV2(AtlasHook.getUser(), entities);
+ HookNotificationMessage message = new EntityCreateRequestV2(AtlasHook.getUser(), entities);
- AtlasHook.notifyEntities(Arrays.asList(message), atlasProperties.getInt(HOOK_NUM_RETRIES, 3));
+ AtlasHook.notifyEntities(Collections.singletonList(message), atlasProperties.getInt(HOOK_NUM_RETRIES, 3));
} catch(Exception e) {
+ LOG.error("SqoopHook.publish() failed", e);
+
throw new AtlasHookException("SqoopHook.publish() failed.", e);
}
}
- private AtlasEntity createHiveDatabaseInstance(String clusterName, String dbName) {
+ private AtlasEntity toHiveDatabaseEntity(String clusterName, String dbName) {
AtlasEntity entHiveDb = new AtlasEntity(HiveDataTypes.HIVE_DB.getName());
String qualifiedName = HiveMetaStoreBridge.getDBQualifiedName(clusterName, dbName);
@@ -108,7 +113,7 @@ public class SqoopHook extends SqoopJobDataPublisher {
return entHiveDb;
}
- private AtlasEntity createHiveTableInstance(AtlasEntity entHiveDb, String tableName) {
+ private AtlasEntity toHiveTableEntity(AtlasEntity entHiveDb, String tableName) {
AtlasEntity entHiveTable = new AtlasEntity(HiveDataTypes.HIVE_TABLE.getName());
String qualifiedName = HiveMetaStoreBridge.getTableQualifiedName((String)entHiveDb.getAttribute(AtlasConstants.CLUSTER_NAME_ATTRIBUTE), (String)entHiveDb.getAttribute(AtlasClient.NAME), tableName);
@@ -119,7 +124,7 @@ public class SqoopHook extends SqoopJobDataPublisher {
return entHiveTable;
}
- private AtlasEntity createDBStoreInstance(SqoopJobDataPublisher.Data data) throws ImportException {
+ private AtlasEntity toSqoopDBStoreEntity(SqoopJobDataPublisher.Data data) throws ImportException {
String table = data.getStoreTable();
String query = data.getStoreQuery();
@@ -145,7 +150,7 @@ public class SqoopHook extends SqoopJobDataPublisher {
return entDbStore;
}
- private AtlasEntity createSqoopProcessInstance(AtlasEntity entDbStore, AtlasEntity entHiveTable, SqoopJobDataPublisher.Data data, String clusterName) {
+ private AtlasEntity toSqoopProcessEntity(AtlasEntity entDbStore, AtlasEntity entHiveDb, AtlasEntity entHiveTable, SqoopJobDataPublisher.Data data, String clusterName) {
AtlasEntity entProcess = new AtlasEntity(SqoopDataTypes.SQOOP_PROCESS.getName());
String sqoopProcessName = getSqoopProcessName(data, clusterName);
Map<String, String> sqoopOptionsMap = new HashMap<>();
@@ -159,12 +164,15 @@ public class SqoopHook extends SqoopJobDataPublisher {
entProcess.setAttribute(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, sqoopProcessName);
entProcess.setAttribute(SqoopHook.OPERATION, data.getOperation());
+ List<AtlasObjectId> sqoopObjects = Collections.singletonList(AtlasTypeUtil.getAtlasObjectId(entDbStore));
+ List<AtlasObjectId> hiveObjects = Collections.singletonList(AtlasTypeUtil.getAtlasObjectId(entHiveTable != null ? entHiveTable : entHiveDb));
+
if (isImportOperation(data)) {
- entProcess.setAttribute(SqoopHook.INPUTS, Arrays.asList(AtlasTypeUtil.getAtlasObjectId(entDbStore)));
- entProcess.setAttribute(SqoopHook.OUTPUTS, Arrays.asList(AtlasTypeUtil.getAtlasObjectId(entHiveTable)));
+ entProcess.setAttribute(SqoopHook.INPUTS, sqoopObjects);
+ entProcess.setAttribute(SqoopHook.OUTPUTS, hiveObjects);
} else {
- entProcess.setAttribute(SqoopHook.INPUTS, Arrays.asList(AtlasTypeUtil.getAtlasObjectId(entHiveTable)));
- entProcess.setAttribute(SqoopHook.OUTPUTS, Arrays.asList(AtlasTypeUtil.getAtlasObjectId(entDbStore)));
+ entProcess.setAttribute(SqoopHook.INPUTS, hiveObjects);
+ entProcess.setAttribute(SqoopHook.OUTPUTS, sqoopObjects);
}
entProcess.setAttribute(SqoopHook.USER, data.getUser());
@@ -182,15 +190,21 @@ public class SqoopHook extends SqoopJobDataPublisher {
static String getSqoopProcessName(Data data, String clusterName) {
StringBuilder name = new StringBuilder(String.format("sqoop %s --connect %s", data.getOperation(), data.getUrl()));
- if (StringUtils.isNotEmpty(data.getStoreTable())) {
+ if (StringUtils.isNotEmpty(data.getHiveTable())) {
name.append(" --table ").append(data.getStoreTable());
+ } else {
+ name.append(" --database ").append(data.getHiveDB());
}
if (StringUtils.isNotEmpty(data.getStoreQuery())) {
name.append(" --query ").append(data.getStoreQuery());
}
- name.append(String.format(" --hive-%s --hive-database %s --hive-table %s --hive-cluster %s", data.getOperation(), data.getHiveDB().toLowerCase(), data.getHiveTable().toLowerCase(), clusterName));
+ if (data.getHiveTable() != null) {
+ name.append(String.format(" --hive-%s --hive-database %s --hive-table %s --hive-cluster %s", data.getOperation(), data.getHiveDB().toLowerCase(), data.getHiveTable().toLowerCase(), clusterName));
+ } else {
+ name.append(String.format("--hive-%s --hive-database %s --hive-cluster %s", data.getOperation(), data.getHiveDB(), clusterName));
+ }
return name.toString();
}
@@ -198,8 +212,10 @@ public class SqoopHook extends SqoopJobDataPublisher {
static String getSqoopDBStoreName(SqoopJobDataPublisher.Data data) {
StringBuilder name = new StringBuilder(String.format("%s --url %s", data.getStoreType(), data.getUrl()));
- if (StringUtils.isNotEmpty(data.getStoreTable())) {
+ if (StringUtils.isNotEmpty(data.getHiveTable())) {
name.append(" --table ").append(data.getStoreTable());
+ } else {
+ name.append(" --database ").append(data.getHiveDB());
}
if (StringUtils.isNotEmpty(data.getStoreQuery())) {