You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@atlas.apache.org by ma...@apache.org on 2018/02/13 22:08:18 UTC

atlas git commit: ATLAS-2439: updated Sqoop hook to use V2 notifications

Repository: atlas
Updated Branches:
  refs/heads/branch-0.8 99593dff7 -> e8908dbfe


ATLAS-2439: updated Sqoop hook to use V2 notifications

Signed-off-by: Madhan Neethiraj <ma...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/atlas/repo
Commit: http://git-wip-us.apache.org/repos/asf/atlas/commit/e8908dbf
Tree: http://git-wip-us.apache.org/repos/asf/atlas/tree/e8908dbf
Diff: http://git-wip-us.apache.org/repos/asf/atlas/diff/e8908dbf

Branch: refs/heads/branch-0.8
Commit: e8908dbfe1d4dfe641cc7a802625f396aa0a399d
Parents: 99593df
Author: rdsolani <rd...@gmail.com>
Authored: Thu Feb 1 19:41:33 2018 +0530
Committer: Madhan Neethiraj <ma...@apache.org>
Committed: Tue Feb 13 14:03:34 2018 -0800

----------------------------------------------------------------------
 .../org/apache/atlas/sqoop/hook/SqoopHook.java  | 213 ++++++++++---------
 1 file changed, 114 insertions(+), 99 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/atlas/blob/e8908dbf/addons/sqoop-bridge/src/main/java/org/apache/atlas/sqoop/hook/SqoopHook.java
----------------------------------------------------------------------
diff --git a/addons/sqoop-bridge/src/main/java/org/apache/atlas/sqoop/hook/SqoopHook.java b/addons/sqoop-bridge/src/main/java/org/apache/atlas/sqoop/hook/SqoopHook.java
index 50e20fa..5201122 100644
--- a/addons/sqoop-bridge/src/main/java/org/apache/atlas/sqoop/hook/SqoopHook.java
+++ b/addons/sqoop-bridge/src/main/java/org/apache/atlas/sqoop/hook/SqoopHook.java
@@ -26,9 +26,12 @@ import org.apache.atlas.hive.bridge.HiveMetaStoreBridge;
 import org.apache.atlas.hive.model.HiveDataTypes;
 import org.apache.atlas.hook.AtlasHook;
 import org.apache.atlas.hook.AtlasHookException;
-import org.apache.atlas.notification.hook.HookNotification;
+import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.atlas.model.instance.AtlasEntity.AtlasEntitiesWithExtInfo;
+import org.apache.atlas.notification.hook.HookNotification.EntityUpdateRequestV2;
+import org.apache.atlas.notification.hook.HookNotification.HookNotificationMessage;
 import org.apache.atlas.sqoop.model.SqoopDataTypes;
-import org.apache.atlas.typesystem.Referenceable;
+import org.apache.atlas.type.AtlasTypeUtil;
 import org.apache.commons.configuration.Configuration;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.sqoop.SqoopJobDataPublisher;
@@ -46,151 +49,163 @@ import java.util.Properties;
  * AtlasHook sends lineage information to the AtlasSever.
  */
 public class SqoopHook extends SqoopJobDataPublisher {
-
     private static final Logger LOG = LoggerFactory.getLogger(SqoopHook.class);
-    public static final String CONF_PREFIX = "atlas.hook.sqoop.";
-    public static final String HOOK_NUM_RETRIES = CONF_PREFIX + "numRetries";
 
-    public static final String ATLAS_CLUSTER_NAME = "atlas.cluster.name";
+    public static final String CONF_PREFIX          = "atlas.hook.sqoop.";
+    public static final String HOOK_NUM_RETRIES     = CONF_PREFIX + "numRetries";
+    public static final String ATLAS_CLUSTER_NAME   = "atlas.cluster.name";
     public static final String DEFAULT_CLUSTER_NAME = "primary";
 
-    public static final String USER = "userName";
-    public static final String DB_STORE_TYPE = "dbStoreType";
+    public static final String USER           = "userName";
+    public static final String DB_STORE_TYPE  = "dbStoreType";
     public static final String DB_STORE_USAGE = "storeUse";
-    public static final String SOURCE = "source";
-    public static final String DESCRIPTION = "description";
-    public static final String STORE_URI = "storeUri";
-    public static final String OPERATION = "operation";
-    public static final String START_TIME = "startTime";
-    public static final String END_TIME = "endTime";
-    public static final String CMD_LINE_OPTS = "commandlineOpts";
-    // multiple inputs and outputs for process
-    public static final String INPUTS = "inputs";
-    public static final String OUTPUTS = "outputs";
+    public static final String SOURCE         = "source";
+    public static final String DESCRIPTION    = "description";
+    public static final String STORE_URI      = "storeUri";
+    public static final String OPERATION      = "operation";
+    public static final String START_TIME     = "startTime";
+    public static final String END_TIME       = "endTime";
+    public static final String CMD_LINE_OPTS  = "commandlineOpts";
+    public static final String INPUTS         = "inputs";
+    public static final String OUTPUTS        = "outputs";
 
     static {
         org.apache.hadoop.conf.Configuration.addDefaultResource("sqoop-site.xml");
     }
 
-    public Referenceable createHiveDatabaseInstance(String clusterName, String dbName) {
-        Referenceable dbRef = new Referenceable(HiveDataTypes.HIVE_DB.getName());
-        dbRef.set(AtlasConstants.CLUSTER_NAME_ATTRIBUTE, clusterName);
-        dbRef.set(AtlasClient.NAME, dbName);
-        dbRef.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME,
-                HiveMetaStoreBridge.getDBQualifiedName(clusterName, dbName));
-        return dbRef;
-    }
+    @Override
+    public void publish(SqoopJobDataPublisher.Data data) throws AtlasHookException {
+        try {
+            Configuration atlasProperties = ApplicationProperties.get();
+            String        clusterName     = atlasProperties.getString(ATLAS_CLUSTER_NAME, DEFAULT_CLUSTER_NAME);
+            AtlasEntity   entDbStore      = createDBStoreInstance(data);
+            AtlasEntity   entHiveDb       = createHiveDatabaseInstance(clusterName, data.getHiveDB());
+            AtlasEntity   entHiveTable    = createHiveTableInstance(entHiveDb, data.getHiveTable());
+            AtlasEntity   entProcess      = createSqoopProcessInstance(entDbStore, entHiveTable, data, clusterName);
+
+            AtlasEntitiesWithExtInfo entities = new AtlasEntitiesWithExtInfo(entProcess);
 
-    public Referenceable createHiveTableInstance(String clusterName, Referenceable dbRef,
-                                                 String tableName, String dbName) {
-            Referenceable tableRef = new Referenceable(HiveDataTypes.HIVE_TABLE.getName());
-            tableRef.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME,
-                    HiveMetaStoreBridge.getTableQualifiedName(clusterName, dbName, tableName));
-            tableRef.set(AtlasClient.NAME, tableName.toLowerCase());
-            tableRef.set(HiveMetaStoreBridge.DB, dbRef);
-            return tableRef;
+            entities.addReferredEntity(entDbStore);
+            entities.addReferredEntity(entHiveDb);
+            entities.addReferredEntity(entHiveTable);
+
+            HookNotificationMessage message  = new EntityUpdateRequestV2(AtlasHook.getUser(), entities);
+
+            AtlasHook.notifyEntities(Arrays.asList(message), atlasProperties.getInt(HOOK_NUM_RETRIES, 3));
+        } catch(Exception e) {
+            throw new AtlasHookException("SqoopHook.publish() failed.", e);
         }
+    }
 
-    private Referenceable createDBStoreInstance(SqoopJobDataPublisher.Data data)
-            throws ImportException {
+    private AtlasEntity createHiveDatabaseInstance(String clusterName, String dbName) {
+        AtlasEntity entHiveDb     = new AtlasEntity(HiveDataTypes.HIVE_DB.getName());
+        String      qualifiedName = HiveMetaStoreBridge.getDBQualifiedName(clusterName, dbName);
+
+        entHiveDb.setAttribute(AtlasConstants.CLUSTER_NAME_ATTRIBUTE, clusterName);
+        entHiveDb.setAttribute(AtlasClient.NAME, dbName);
+        entHiveDb.setAttribute(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, qualifiedName);
+
+        return entHiveDb;
+    }
 
-        Referenceable storeRef = new Referenceable(SqoopDataTypes.SQOOP_DBDATASTORE.getName());
+    private AtlasEntity createHiveTableInstance(AtlasEntity entHiveDb, String tableName) {
+        AtlasEntity entHiveTable  = new AtlasEntity(HiveDataTypes.HIVE_TABLE.getName());
+        String      qualifiedName = HiveMetaStoreBridge.getTableQualifiedName((String)entHiveDb.getAttribute(AtlasConstants.CLUSTER_NAME_ATTRIBUTE), (String)entHiveDb.getAttribute(AtlasClient.NAME), tableName);
+
+        entHiveTable.setAttribute(AtlasClient.NAME, tableName.toLowerCase());
+        entHiveTable.setAttribute(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, qualifiedName);
+        entHiveTable.setAttribute(HiveMetaStoreBridge.DB, AtlasTypeUtil.getAtlasObjectId(entHiveDb));
+
+        return entHiveTable;
+    }
+
+    private AtlasEntity createDBStoreInstance(SqoopJobDataPublisher.Data data) throws ImportException {
         String table = data.getStoreTable();
         String query = data.getStoreQuery();
+
         if (StringUtils.isBlank(table) && StringUtils.isBlank(query)) {
             throw new ImportException("Both table and query cannot be empty for DBStoreInstance");
         }
 
-        String usage = table != null ? "TABLE" : "QUERY";
+        String usage  = table != null ? "TABLE" : "QUERY";
         String source = table != null ? table : query;
-        String name = getSqoopDBStoreName(data);
-        storeRef.set(AtlasClient.NAME, name);
-        storeRef.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, name);
-        storeRef.set(SqoopHook.DB_STORE_TYPE, data.getStoreType());
-        storeRef.set(SqoopHook.DB_STORE_USAGE, usage);
-        storeRef.set(SqoopHook.STORE_URI, data.getUrl());
-        storeRef.set(SqoopHook.SOURCE, source);
-        storeRef.set(SqoopHook.DESCRIPTION, "");
-        storeRef.set(AtlasClient.OWNER, data.getUser());
-        return storeRef;
+        String name   = getSqoopDBStoreName(data);
+
+        AtlasEntity entDbStore = new AtlasEntity(SqoopDataTypes.SQOOP_DBDATASTORE.getName());
+
+        entDbStore.setAttribute(AtlasClient.NAME, name);
+        entDbStore.setAttribute(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, name);
+        entDbStore.setAttribute(SqoopHook.DB_STORE_TYPE, data.getStoreType());
+        entDbStore.setAttribute(SqoopHook.DB_STORE_USAGE, usage);
+        entDbStore.setAttribute(SqoopHook.STORE_URI, data.getUrl());
+        entDbStore.setAttribute(SqoopHook.SOURCE, source);
+        entDbStore.setAttribute(SqoopHook.DESCRIPTION, "");
+        entDbStore.setAttribute(AtlasClient.OWNER, data.getUser());
+
+        return entDbStore;
     }
 
-    private Referenceable createSqoopProcessInstance(Referenceable dbStoreRef, Referenceable hiveTableRef,
-                                                     SqoopJobDataPublisher.Data data, String clusterName) {
-        Referenceable procRef = new Referenceable(SqoopDataTypes.SQOOP_PROCESS.getName());
-        final String sqoopProcessName = getSqoopProcessName(data, clusterName);
-        procRef.set(AtlasClient.NAME, sqoopProcessName);
-        procRef.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, sqoopProcessName);
-        procRef.set(SqoopHook.OPERATION, data.getOperation());
-        if (isImportOperation(data)) {
-            procRef.set(SqoopHook.INPUTS, dbStoreRef);
-            procRef.set(SqoopHook.OUTPUTS, hiveTableRef);
-        } else {
-            procRef.set(SqoopHook.INPUTS, hiveTableRef);
-            procRef.set(SqoopHook.OUTPUTS, dbStoreRef);
-        }
-        procRef.set(SqoopHook.USER, data.getUser());
-        procRef.set(SqoopHook.START_TIME, new Date(data.getStartTime()));
-        procRef.set(SqoopHook.END_TIME, new Date(data.getEndTime()));
+    private AtlasEntity createSqoopProcessInstance(AtlasEntity entDbStore, AtlasEntity entHiveTable, SqoopJobDataPublisher.Data data, String clusterName) {
+        AtlasEntity         entProcess       = new AtlasEntity(SqoopDataTypes.SQOOP_PROCESS.getName());
+        String              sqoopProcessName = getSqoopProcessName(data, clusterName);
+        Map<String, String> sqoopOptionsMap  = new HashMap<>();
+        Properties          options          = data.getOptions();
 
-        Map<String, String> sqoopOptionsMap = new HashMap<>();
-        Properties options = data.getOptions();
         for (Object k : options.keySet()) {
             sqoopOptionsMap.put((String)k, (String) options.get(k));
         }
-        procRef.set(SqoopHook.CMD_LINE_OPTS, sqoopOptionsMap);
 
-        return procRef;
+        entProcess.setAttribute(AtlasClient.NAME, sqoopProcessName);
+        entProcess.setAttribute(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, sqoopProcessName);
+        entProcess.setAttribute(SqoopHook.OPERATION, data.getOperation());
+
+        if (isImportOperation(data)) {
+            entProcess.setAttribute(SqoopHook.INPUTS, AtlasTypeUtil.getAtlasObjectId(entDbStore));
+            entProcess.setAttribute(SqoopHook.OUTPUTS, AtlasTypeUtil.getAtlasObjectId(entHiveTable));
+        } else {
+            entProcess.setAttribute(SqoopHook.INPUTS, AtlasTypeUtil.getAtlasObjectId(entHiveTable));
+            entProcess.setAttribute(SqoopHook.OUTPUTS, AtlasTypeUtil.getAtlasObjectId(entDbStore));
+        }
+
+        entProcess.setAttribute(SqoopHook.USER, data.getUser());
+        entProcess.setAttribute(SqoopHook.START_TIME, new Date(data.getStartTime()));
+        entProcess.setAttribute(SqoopHook.END_TIME, new Date(data.getEndTime()));
+        entProcess.setAttribute(SqoopHook.CMD_LINE_OPTS, sqoopOptionsMap);
+
+        return entProcess;
+    }
+
+    private boolean isImportOperation(SqoopJobDataPublisher.Data data) {
+        return data.getOperation().toLowerCase().equals("import");
     }
 
     static String getSqoopProcessName(Data data, String clusterName) {
-        StringBuilder name = new StringBuilder(String.format("sqoop %s --connect %s", data.getOperation(),
-                data.getUrl()));
+        StringBuilder name = new StringBuilder(String.format("sqoop %s --connect %s", data.getOperation(), data.getUrl()));
+
         if (StringUtils.isNotEmpty(data.getStoreTable())) {
             name.append(" --table ").append(data.getStoreTable());
         }
+
         if (StringUtils.isNotEmpty(data.getStoreQuery())) {
             name.append(" --query ").append(data.getStoreQuery());
         }
-        name.append(String.format(" --hive-%s --hive-database %s --hive-table %s --hive-cluster %s",
-                data.getOperation(), data.getHiveDB().toLowerCase(), data.getHiveTable().toLowerCase(), clusterName));
+
+        name.append(String.format(" --hive-%s --hive-database %s --hive-table %s --hive-cluster %s", data.getOperation(), data.getHiveDB().toLowerCase(), data.getHiveTable().toLowerCase(), clusterName));
+
         return name.toString();
     }
 
     static String getSqoopDBStoreName(SqoopJobDataPublisher.Data data)  {
         StringBuilder name = new StringBuilder(String.format("%s --url %s", data.getStoreType(), data.getUrl()));
+
         if (StringUtils.isNotEmpty(data.getStoreTable())) {
             name.append(" --table ").append(data.getStoreTable());
         }
+
         if (StringUtils.isNotEmpty(data.getStoreQuery())) {
             name.append(" --query ").append(data.getStoreQuery());
         }
-        return name.toString();
-    }
 
-    static boolean isImportOperation(SqoopJobDataPublisher.Data data) {
-        return data.getOperation().toLowerCase().equals("import");
-    }
-
-    @Override
-    public void publish(SqoopJobDataPublisher.Data data) throws AtlasHookException {
-        try {
-            Configuration atlasProperties = ApplicationProperties.get();
-            String clusterName = atlasProperties.getString(ATLAS_CLUSTER_NAME, DEFAULT_CLUSTER_NAME);
-
-            Referenceable dbStoreRef = createDBStoreInstance(data);
-            Referenceable dbRef = createHiveDatabaseInstance(clusterName, data.getHiveDB());
-            Referenceable hiveTableRef = createHiveTableInstance(clusterName, dbRef,
-                    data.getHiveTable(), data.getHiveDB());
-            Referenceable procRef = createSqoopProcessInstance(dbStoreRef, hiveTableRef, data, clusterName);
-
-            int maxRetries = atlasProperties.getInt(HOOK_NUM_RETRIES, 3);
-            HookNotification.HookNotificationMessage message =
-                    new HookNotification.EntityCreateRequest(AtlasHook.getUser(), dbStoreRef, dbRef, hiveTableRef, procRef);
-            AtlasHook.notifyEntities(Arrays.asList(message), maxRetries);
-        }
-        catch(Exception e) {
-            throw new AtlasHookException("SqoopHook.publish() failed.", e);
-        }
+        return name.toString();
     }
 }