You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@atlas.apache.org by ma...@apache.org on 2018/03/23 22:58:46 UTC

atlas git commit: ATLAS-2462: Sqoop import for all tables throws NPE for no table provided in command

Repository: atlas
Updated Branches:
  refs/heads/branch-0.8 4e27bf086 -> 7adbf8ffd


ATLAS-2462: Sqoop import for all tables throws NPE for no table provided in command

Signed-off-by: Madhan Neethiraj <ma...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/atlas/repo
Commit: http://git-wip-us.apache.org/repos/asf/atlas/commit/7adbf8ff
Tree: http://git-wip-us.apache.org/repos/asf/atlas/tree/7adbf8ff
Diff: http://git-wip-us.apache.org/repos/asf/atlas/diff/7adbf8ff

Branch: refs/heads/branch-0.8
Commit: 7adbf8ffd27904577ecf694a9f861cdd46833069
Parents: 4e27bf0
Author: rdsolani <rd...@gmail.com>
Authored: Fri Mar 23 17:18:20 2018 +0530
Committer: Madhan Neethiraj <ma...@apache.org>
Committed: Fri Mar 23 15:43:46 2018 -0700

----------------------------------------------------------------------
 .../org/apache/atlas/sqoop/hook/SqoopHook.java  | 62 ++++++++++++--------
 1 file changed, 39 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/atlas/blob/7adbf8ff/addons/sqoop-bridge/src/main/java/org/apache/atlas/sqoop/hook/SqoopHook.java
----------------------------------------------------------------------
diff --git a/addons/sqoop-bridge/src/main/java/org/apache/atlas/sqoop/hook/SqoopHook.java b/addons/sqoop-bridge/src/main/java/org/apache/atlas/sqoop/hook/SqoopHook.java
index 9e43430..666ec13 100644
--- a/addons/sqoop-bridge/src/main/java/org/apache/atlas/sqoop/hook/SqoopHook.java
+++ b/addons/sqoop-bridge/src/main/java/org/apache/atlas/sqoop/hook/SqoopHook.java
@@ -28,7 +28,8 @@ import org.apache.atlas.hook.AtlasHook;
 import org.apache.atlas.hook.AtlasHookException;
 import org.apache.atlas.model.instance.AtlasEntity;
 import org.apache.atlas.model.instance.AtlasEntity.AtlasEntitiesWithExtInfo;
-import org.apache.atlas.notification.hook.HookNotification.EntityUpdateRequestV2;
+import org.apache.atlas.model.instance.AtlasObjectId;
+import org.apache.atlas.notification.hook.HookNotification.EntityCreateRequestV2;
 import org.apache.atlas.notification.hook.HookNotification.HookNotificationMessage;
 import org.apache.atlas.sqoop.model.SqoopDataTypes;
 import org.apache.atlas.type.AtlasTypeUtil;
@@ -39,12 +40,12 @@ import org.apache.sqoop.util.ImportException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.Arrays;
-import java.util.Date;
-import java.util.HashMap;
+import java.util.Collections;
 import java.util.Map;
+import java.util.HashMap;
 import java.util.Properties;
-
+import java.util.List;
+import java.util.Date;
 /**
  * AtlasHook sends lineage information to the AtlasSever.
  */
@@ -78,26 +79,30 @@ public class SqoopHook extends SqoopJobDataPublisher {
         try {
             Configuration atlasProperties = ApplicationProperties.get();
             String        clusterName     = atlasProperties.getString(ATLAS_CLUSTER_NAME, DEFAULT_CLUSTER_NAME);
-            AtlasEntity   entDbStore      = createDBStoreInstance(data);
-            AtlasEntity   entHiveDb       = createHiveDatabaseInstance(clusterName, data.getHiveDB());
-            AtlasEntity   entHiveTable    = createHiveTableInstance(entHiveDb, data.getHiveTable());
-            AtlasEntity   entProcess      = createSqoopProcessInstance(entDbStore, entHiveTable, data, clusterName);
+            AtlasEntity   entDbStore      = toSqoopDBStoreEntity(data);
+            AtlasEntity   entHiveDb       = toHiveDatabaseEntity(clusterName, data.getHiveDB());
+            AtlasEntity   entHiveTable    = data.getHiveTable() != null ? toHiveTableEntity(entHiveDb, data.getHiveTable()) : null;
+            AtlasEntity   entProcess      = toSqoopProcessEntity(entDbStore, entHiveDb, entHiveTable, data, clusterName);
 
             AtlasEntitiesWithExtInfo entities = new AtlasEntitiesWithExtInfo(entProcess);
 
             entities.addReferredEntity(entDbStore);
             entities.addReferredEntity(entHiveDb);
-            entities.addReferredEntity(entHiveTable);
+            if (entHiveTable != null) {
+                entities.addReferredEntity(entHiveTable);
+            }
 
-            HookNotificationMessage message  = new EntityUpdateRequestV2(AtlasHook.getUser(), entities);
+            HookNotificationMessage message = new EntityCreateRequestV2(AtlasHook.getUser(), entities);
 
-            AtlasHook.notifyEntities(Arrays.asList(message), atlasProperties.getInt(HOOK_NUM_RETRIES, 3));
+            AtlasHook.notifyEntities(Collections.singletonList(message), atlasProperties.getInt(HOOK_NUM_RETRIES, 3));
         } catch(Exception e) {
+            LOG.error("SqoopHook.publish() failed", e);
+
             throw new AtlasHookException("SqoopHook.publish() failed.", e);
         }
     }
 
-    private AtlasEntity createHiveDatabaseInstance(String clusterName, String dbName) {
+    private AtlasEntity toHiveDatabaseEntity(String clusterName, String dbName) {
         AtlasEntity entHiveDb     = new AtlasEntity(HiveDataTypes.HIVE_DB.getName());
         String      qualifiedName = HiveMetaStoreBridge.getDBQualifiedName(clusterName, dbName);
 
@@ -108,7 +113,7 @@ public class SqoopHook extends SqoopJobDataPublisher {
         return entHiveDb;
     }
 
-    private AtlasEntity createHiveTableInstance(AtlasEntity entHiveDb, String tableName) {
+    private AtlasEntity toHiveTableEntity(AtlasEntity entHiveDb, String tableName) {
         AtlasEntity entHiveTable  = new AtlasEntity(HiveDataTypes.HIVE_TABLE.getName());
         String      qualifiedName = HiveMetaStoreBridge.getTableQualifiedName((String)entHiveDb.getAttribute(AtlasConstants.CLUSTER_NAME_ATTRIBUTE), (String)entHiveDb.getAttribute(AtlasClient.NAME), tableName);
 
@@ -119,7 +124,7 @@ public class SqoopHook extends SqoopJobDataPublisher {
         return entHiveTable;
     }
 
-    private AtlasEntity createDBStoreInstance(SqoopJobDataPublisher.Data data) throws ImportException {
+    private AtlasEntity toSqoopDBStoreEntity(SqoopJobDataPublisher.Data data) throws ImportException {
         String table = data.getStoreTable();
         String query = data.getStoreQuery();
 
@@ -145,7 +150,7 @@ public class SqoopHook extends SqoopJobDataPublisher {
         return entDbStore;
     }
 
-    private AtlasEntity createSqoopProcessInstance(AtlasEntity entDbStore, AtlasEntity entHiveTable, SqoopJobDataPublisher.Data data, String clusterName) {
+    private AtlasEntity toSqoopProcessEntity(AtlasEntity entDbStore, AtlasEntity entHiveDb, AtlasEntity entHiveTable, SqoopJobDataPublisher.Data data, String clusterName) {
         AtlasEntity         entProcess       = new AtlasEntity(SqoopDataTypes.SQOOP_PROCESS.getName());
         String              sqoopProcessName = getSqoopProcessName(data, clusterName);
         Map<String, String> sqoopOptionsMap  = new HashMap<>();
@@ -159,12 +164,15 @@ public class SqoopHook extends SqoopJobDataPublisher {
         entProcess.setAttribute(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, sqoopProcessName);
         entProcess.setAttribute(SqoopHook.OPERATION, data.getOperation());
 
+        List<AtlasObjectId> sqoopObjects = Collections.singletonList(AtlasTypeUtil.getAtlasObjectId(entDbStore));
+        List<AtlasObjectId> hiveObjects  = Collections.singletonList(AtlasTypeUtil.getAtlasObjectId(entHiveTable != null ? entHiveTable : entHiveDb));
+
         if (isImportOperation(data)) {
-            entProcess.setAttribute(SqoopHook.INPUTS, Arrays.asList(AtlasTypeUtil.getAtlasObjectId(entDbStore)));
-            entProcess.setAttribute(SqoopHook.OUTPUTS, Arrays.asList(AtlasTypeUtil.getAtlasObjectId(entHiveTable)));
+            entProcess.setAttribute(SqoopHook.INPUTS, sqoopObjects);
+            entProcess.setAttribute(SqoopHook.OUTPUTS, hiveObjects);
         } else {
-            entProcess.setAttribute(SqoopHook.INPUTS, Arrays.asList(AtlasTypeUtil.getAtlasObjectId(entHiveTable)));
-            entProcess.setAttribute(SqoopHook.OUTPUTS, Arrays.asList(AtlasTypeUtil.getAtlasObjectId(entDbStore)));
+            entProcess.setAttribute(SqoopHook.INPUTS, hiveObjects);
+            entProcess.setAttribute(SqoopHook.OUTPUTS, sqoopObjects);
         }
 
         entProcess.setAttribute(SqoopHook.USER, data.getUser());
@@ -182,15 +190,21 @@ public class SqoopHook extends SqoopJobDataPublisher {
     static String getSqoopProcessName(Data data, String clusterName) {
         StringBuilder name = new StringBuilder(String.format("sqoop %s --connect %s", data.getOperation(), data.getUrl()));
 
-        if (StringUtils.isNotEmpty(data.getStoreTable())) {
+        if (StringUtils.isNotEmpty(data.getHiveTable())) {
             name.append(" --table ").append(data.getStoreTable());
+        } else {
+            name.append(" --database ").append(data.getHiveDB());
         }
 
         if (StringUtils.isNotEmpty(data.getStoreQuery())) {
             name.append(" --query ").append(data.getStoreQuery());
         }
 
-        name.append(String.format(" --hive-%s --hive-database %s --hive-table %s --hive-cluster %s", data.getOperation(), data.getHiveDB().toLowerCase(), data.getHiveTable().toLowerCase(), clusterName));
+        if (data.getHiveTable() != null) {
+            name.append(String.format(" --hive-%s --hive-database %s --hive-table %s --hive-cluster %s", data.getOperation(), data.getHiveDB().toLowerCase(), data.getHiveTable().toLowerCase(), clusterName));
+        } else {
+            name.append(String.format("--hive-%s --hive-database %s --hive-cluster %s", data.getOperation(), data.getHiveDB(), clusterName));
+        }
 
         return name.toString();
     }
@@ -198,8 +212,10 @@ public class SqoopHook extends SqoopJobDataPublisher {
     static String getSqoopDBStoreName(SqoopJobDataPublisher.Data data)  {
         StringBuilder name = new StringBuilder(String.format("%s --url %s", data.getStoreType(), data.getUrl()));
 
-        if (StringUtils.isNotEmpty(data.getStoreTable())) {
+        if (StringUtils.isNotEmpty(data.getHiveTable())) {
             name.append(" --table ").append(data.getStoreTable());
+        } else {
+            name.append(" --database ").append(data.getHiveDB());
         }
 
         if (StringUtils.isNotEmpty(data.getStoreQuery())) {