You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@atlas.apache.org by ma...@apache.org on 2018/02/13 22:08:18 UTC
atlas git commit: ATLAS-2439: updated Sqoop hook to use V2
notifications
Repository: atlas
Updated Branches:
refs/heads/branch-0.8 99593dff7 -> e8908dbfe
ATLAS-2439: updated Sqoop hook to use V2 notifications
Signed-off-by: Madhan Neethiraj <ma...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/atlas/repo
Commit: http://git-wip-us.apache.org/repos/asf/atlas/commit/e8908dbf
Tree: http://git-wip-us.apache.org/repos/asf/atlas/tree/e8908dbf
Diff: http://git-wip-us.apache.org/repos/asf/atlas/diff/e8908dbf
Branch: refs/heads/branch-0.8
Commit: e8908dbfe1d4dfe641cc7a802625f396aa0a399d
Parents: 99593df
Author: rdsolani <rd...@gmail.com>
Authored: Thu Feb 1 19:41:33 2018 +0530
Committer: Madhan Neethiraj <ma...@apache.org>
Committed: Tue Feb 13 14:03:34 2018 -0800
----------------------------------------------------------------------
.../org/apache/atlas/sqoop/hook/SqoopHook.java | 213 ++++++++++---------
1 file changed, 114 insertions(+), 99 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/atlas/blob/e8908dbf/addons/sqoop-bridge/src/main/java/org/apache/atlas/sqoop/hook/SqoopHook.java
----------------------------------------------------------------------
diff --git a/addons/sqoop-bridge/src/main/java/org/apache/atlas/sqoop/hook/SqoopHook.java b/addons/sqoop-bridge/src/main/java/org/apache/atlas/sqoop/hook/SqoopHook.java
index 50e20fa..5201122 100644
--- a/addons/sqoop-bridge/src/main/java/org/apache/atlas/sqoop/hook/SqoopHook.java
+++ b/addons/sqoop-bridge/src/main/java/org/apache/atlas/sqoop/hook/SqoopHook.java
@@ -26,9 +26,12 @@ import org.apache.atlas.hive.bridge.HiveMetaStoreBridge;
import org.apache.atlas.hive.model.HiveDataTypes;
import org.apache.atlas.hook.AtlasHook;
import org.apache.atlas.hook.AtlasHookException;
-import org.apache.atlas.notification.hook.HookNotification;
+import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.atlas.model.instance.AtlasEntity.AtlasEntitiesWithExtInfo;
+import org.apache.atlas.notification.hook.HookNotification.EntityUpdateRequestV2;
+import org.apache.atlas.notification.hook.HookNotification.HookNotificationMessage;
import org.apache.atlas.sqoop.model.SqoopDataTypes;
-import org.apache.atlas.typesystem.Referenceable;
+import org.apache.atlas.type.AtlasTypeUtil;
import org.apache.commons.configuration.Configuration;
import org.apache.commons.lang3.StringUtils;
import org.apache.sqoop.SqoopJobDataPublisher;
@@ -46,151 +49,163 @@ import java.util.Properties;
* AtlasHook sends lineage information to the AtlasSever.
*/
public class SqoopHook extends SqoopJobDataPublisher {
-
private static final Logger LOG = LoggerFactory.getLogger(SqoopHook.class);
- public static final String CONF_PREFIX = "atlas.hook.sqoop.";
- public static final String HOOK_NUM_RETRIES = CONF_PREFIX + "numRetries";
- public static final String ATLAS_CLUSTER_NAME = "atlas.cluster.name";
+ public static final String CONF_PREFIX = "atlas.hook.sqoop.";
+ public static final String HOOK_NUM_RETRIES = CONF_PREFIX + "numRetries";
+ public static final String ATLAS_CLUSTER_NAME = "atlas.cluster.name";
public static final String DEFAULT_CLUSTER_NAME = "primary";
- public static final String USER = "userName";
- public static final String DB_STORE_TYPE = "dbStoreType";
+ public static final String USER = "userName";
+ public static final String DB_STORE_TYPE = "dbStoreType";
public static final String DB_STORE_USAGE = "storeUse";
- public static final String SOURCE = "source";
- public static final String DESCRIPTION = "description";
- public static final String STORE_URI = "storeUri";
- public static final String OPERATION = "operation";
- public static final String START_TIME = "startTime";
- public static final String END_TIME = "endTime";
- public static final String CMD_LINE_OPTS = "commandlineOpts";
- // multiple inputs and outputs for process
- public static final String INPUTS = "inputs";
- public static final String OUTPUTS = "outputs";
+ public static final String SOURCE = "source";
+ public static final String DESCRIPTION = "description";
+ public static final String STORE_URI = "storeUri";
+ public static final String OPERATION = "operation";
+ public static final String START_TIME = "startTime";
+ public static final String END_TIME = "endTime";
+ public static final String CMD_LINE_OPTS = "commandlineOpts";
+ public static final String INPUTS = "inputs";
+ public static final String OUTPUTS = "outputs";
static {
org.apache.hadoop.conf.Configuration.addDefaultResource("sqoop-site.xml");
}
- public Referenceable createHiveDatabaseInstance(String clusterName, String dbName) {
- Referenceable dbRef = new Referenceable(HiveDataTypes.HIVE_DB.getName());
- dbRef.set(AtlasConstants.CLUSTER_NAME_ATTRIBUTE, clusterName);
- dbRef.set(AtlasClient.NAME, dbName);
- dbRef.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME,
- HiveMetaStoreBridge.getDBQualifiedName(clusterName, dbName));
- return dbRef;
- }
+ @Override
+ public void publish(SqoopJobDataPublisher.Data data) throws AtlasHookException {
+ try {
+ Configuration atlasProperties = ApplicationProperties.get();
+ String clusterName = atlasProperties.getString(ATLAS_CLUSTER_NAME, DEFAULT_CLUSTER_NAME);
+ AtlasEntity entDbStore = createDBStoreInstance(data);
+ AtlasEntity entHiveDb = createHiveDatabaseInstance(clusterName, data.getHiveDB());
+ AtlasEntity entHiveTable = createHiveTableInstance(entHiveDb, data.getHiveTable());
+ AtlasEntity entProcess = createSqoopProcessInstance(entDbStore, entHiveTable, data, clusterName);
+
+ AtlasEntitiesWithExtInfo entities = new AtlasEntitiesWithExtInfo(entProcess);
- public Referenceable createHiveTableInstance(String clusterName, Referenceable dbRef,
- String tableName, String dbName) {
- Referenceable tableRef = new Referenceable(HiveDataTypes.HIVE_TABLE.getName());
- tableRef.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME,
- HiveMetaStoreBridge.getTableQualifiedName(clusterName, dbName, tableName));
- tableRef.set(AtlasClient.NAME, tableName.toLowerCase());
- tableRef.set(HiveMetaStoreBridge.DB, dbRef);
- return tableRef;
+ entities.addReferredEntity(entDbStore);
+ entities.addReferredEntity(entHiveDb);
+ entities.addReferredEntity(entHiveTable);
+
+ HookNotificationMessage message = new EntityUpdateRequestV2(AtlasHook.getUser(), entities);
+
+ AtlasHook.notifyEntities(Arrays.asList(message), atlasProperties.getInt(HOOK_NUM_RETRIES, 3));
+ } catch(Exception e) {
+ throw new AtlasHookException("SqoopHook.publish() failed.", e);
}
+ }
- private Referenceable createDBStoreInstance(SqoopJobDataPublisher.Data data)
- throws ImportException {
+ private AtlasEntity createHiveDatabaseInstance(String clusterName, String dbName) {
+ AtlasEntity entHiveDb = new AtlasEntity(HiveDataTypes.HIVE_DB.getName());
+ String qualifiedName = HiveMetaStoreBridge.getDBQualifiedName(clusterName, dbName);
+
+ entHiveDb.setAttribute(AtlasConstants.CLUSTER_NAME_ATTRIBUTE, clusterName);
+ entHiveDb.setAttribute(AtlasClient.NAME, dbName);
+ entHiveDb.setAttribute(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, qualifiedName);
+
+ return entHiveDb;
+ }
- Referenceable storeRef = new Referenceable(SqoopDataTypes.SQOOP_DBDATASTORE.getName());
+ private AtlasEntity createHiveTableInstance(AtlasEntity entHiveDb, String tableName) {
+ AtlasEntity entHiveTable = new AtlasEntity(HiveDataTypes.HIVE_TABLE.getName());
+ String qualifiedName = HiveMetaStoreBridge.getTableQualifiedName((String)entHiveDb.getAttribute(AtlasConstants.CLUSTER_NAME_ATTRIBUTE), (String)entHiveDb.getAttribute(AtlasClient.NAME), tableName);
+
+ entHiveTable.setAttribute(AtlasClient.NAME, tableName.toLowerCase());
+ entHiveTable.setAttribute(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, qualifiedName);
+ entHiveTable.setAttribute(HiveMetaStoreBridge.DB, AtlasTypeUtil.getAtlasObjectId(entHiveDb));
+
+ return entHiveTable;
+ }
+
+ private AtlasEntity createDBStoreInstance(SqoopJobDataPublisher.Data data) throws ImportException {
String table = data.getStoreTable();
String query = data.getStoreQuery();
+
if (StringUtils.isBlank(table) && StringUtils.isBlank(query)) {
throw new ImportException("Both table and query cannot be empty for DBStoreInstance");
}
- String usage = table != null ? "TABLE" : "QUERY";
+ String usage = table != null ? "TABLE" : "QUERY";
String source = table != null ? table : query;
- String name = getSqoopDBStoreName(data);
- storeRef.set(AtlasClient.NAME, name);
- storeRef.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, name);
- storeRef.set(SqoopHook.DB_STORE_TYPE, data.getStoreType());
- storeRef.set(SqoopHook.DB_STORE_USAGE, usage);
- storeRef.set(SqoopHook.STORE_URI, data.getUrl());
- storeRef.set(SqoopHook.SOURCE, source);
- storeRef.set(SqoopHook.DESCRIPTION, "");
- storeRef.set(AtlasClient.OWNER, data.getUser());
- return storeRef;
+ String name = getSqoopDBStoreName(data);
+
+ AtlasEntity entDbStore = new AtlasEntity(SqoopDataTypes.SQOOP_DBDATASTORE.getName());
+
+ entDbStore.setAttribute(AtlasClient.NAME, name);
+ entDbStore.setAttribute(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, name);
+ entDbStore.setAttribute(SqoopHook.DB_STORE_TYPE, data.getStoreType());
+ entDbStore.setAttribute(SqoopHook.DB_STORE_USAGE, usage);
+ entDbStore.setAttribute(SqoopHook.STORE_URI, data.getUrl());
+ entDbStore.setAttribute(SqoopHook.SOURCE, source);
+ entDbStore.setAttribute(SqoopHook.DESCRIPTION, "");
+ entDbStore.setAttribute(AtlasClient.OWNER, data.getUser());
+
+ return entDbStore;
}
- private Referenceable createSqoopProcessInstance(Referenceable dbStoreRef, Referenceable hiveTableRef,
- SqoopJobDataPublisher.Data data, String clusterName) {
- Referenceable procRef = new Referenceable(SqoopDataTypes.SQOOP_PROCESS.getName());
- final String sqoopProcessName = getSqoopProcessName(data, clusterName);
- procRef.set(AtlasClient.NAME, sqoopProcessName);
- procRef.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, sqoopProcessName);
- procRef.set(SqoopHook.OPERATION, data.getOperation());
- if (isImportOperation(data)) {
- procRef.set(SqoopHook.INPUTS, dbStoreRef);
- procRef.set(SqoopHook.OUTPUTS, hiveTableRef);
- } else {
- procRef.set(SqoopHook.INPUTS, hiveTableRef);
- procRef.set(SqoopHook.OUTPUTS, dbStoreRef);
- }
- procRef.set(SqoopHook.USER, data.getUser());
- procRef.set(SqoopHook.START_TIME, new Date(data.getStartTime()));
- procRef.set(SqoopHook.END_TIME, new Date(data.getEndTime()));
+ private AtlasEntity createSqoopProcessInstance(AtlasEntity entDbStore, AtlasEntity entHiveTable, SqoopJobDataPublisher.Data data, String clusterName) {
+ AtlasEntity entProcess = new AtlasEntity(SqoopDataTypes.SQOOP_PROCESS.getName());
+ String sqoopProcessName = getSqoopProcessName(data, clusterName);
+ Map<String, String> sqoopOptionsMap = new HashMap<>();
+ Properties options = data.getOptions();
- Map<String, String> sqoopOptionsMap = new HashMap<>();
- Properties options = data.getOptions();
for (Object k : options.keySet()) {
sqoopOptionsMap.put((String)k, (String) options.get(k));
}
- procRef.set(SqoopHook.CMD_LINE_OPTS, sqoopOptionsMap);
- return procRef;
+ entProcess.setAttribute(AtlasClient.NAME, sqoopProcessName);
+ entProcess.setAttribute(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, sqoopProcessName);
+ entProcess.setAttribute(SqoopHook.OPERATION, data.getOperation());
+
+ if (isImportOperation(data)) {
+ entProcess.setAttribute(SqoopHook.INPUTS, AtlasTypeUtil.getAtlasObjectId(entDbStore));
+ entProcess.setAttribute(SqoopHook.OUTPUTS, AtlasTypeUtil.getAtlasObjectId(entHiveTable));
+ } else {
+ entProcess.setAttribute(SqoopHook.INPUTS, AtlasTypeUtil.getAtlasObjectId(entHiveTable));
+ entProcess.setAttribute(SqoopHook.OUTPUTS, AtlasTypeUtil.getAtlasObjectId(entDbStore));
+ }
+
+ entProcess.setAttribute(SqoopHook.USER, data.getUser());
+ entProcess.setAttribute(SqoopHook.START_TIME, new Date(data.getStartTime()));
+ entProcess.setAttribute(SqoopHook.END_TIME, new Date(data.getEndTime()));
+ entProcess.setAttribute(SqoopHook.CMD_LINE_OPTS, sqoopOptionsMap);
+
+ return entProcess;
+ }
+
+ private boolean isImportOperation(SqoopJobDataPublisher.Data data) {
+ return data.getOperation().toLowerCase().equals("import");
}
static String getSqoopProcessName(Data data, String clusterName) {
- StringBuilder name = new StringBuilder(String.format("sqoop %s --connect %s", data.getOperation(),
- data.getUrl()));
+ StringBuilder name = new StringBuilder(String.format("sqoop %s --connect %s", data.getOperation(), data.getUrl()));
+
if (StringUtils.isNotEmpty(data.getStoreTable())) {
name.append(" --table ").append(data.getStoreTable());
}
+
if (StringUtils.isNotEmpty(data.getStoreQuery())) {
name.append(" --query ").append(data.getStoreQuery());
}
- name.append(String.format(" --hive-%s --hive-database %s --hive-table %s --hive-cluster %s",
- data.getOperation(), data.getHiveDB().toLowerCase(), data.getHiveTable().toLowerCase(), clusterName));
+
+ name.append(String.format(" --hive-%s --hive-database %s --hive-table %s --hive-cluster %s", data.getOperation(), data.getHiveDB().toLowerCase(), data.getHiveTable().toLowerCase(), clusterName));
+
return name.toString();
}
static String getSqoopDBStoreName(SqoopJobDataPublisher.Data data) {
StringBuilder name = new StringBuilder(String.format("%s --url %s", data.getStoreType(), data.getUrl()));
+
if (StringUtils.isNotEmpty(data.getStoreTable())) {
name.append(" --table ").append(data.getStoreTable());
}
+
if (StringUtils.isNotEmpty(data.getStoreQuery())) {
name.append(" --query ").append(data.getStoreQuery());
}
- return name.toString();
- }
- static boolean isImportOperation(SqoopJobDataPublisher.Data data) {
- return data.getOperation().toLowerCase().equals("import");
- }
-
- @Override
- public void publish(SqoopJobDataPublisher.Data data) throws AtlasHookException {
- try {
- Configuration atlasProperties = ApplicationProperties.get();
- String clusterName = atlasProperties.getString(ATLAS_CLUSTER_NAME, DEFAULT_CLUSTER_NAME);
-
- Referenceable dbStoreRef = createDBStoreInstance(data);
- Referenceable dbRef = createHiveDatabaseInstance(clusterName, data.getHiveDB());
- Referenceable hiveTableRef = createHiveTableInstance(clusterName, dbRef,
- data.getHiveTable(), data.getHiveDB());
- Referenceable procRef = createSqoopProcessInstance(dbStoreRef, hiveTableRef, data, clusterName);
-
- int maxRetries = atlasProperties.getInt(HOOK_NUM_RETRIES, 3);
- HookNotification.HookNotificationMessage message =
- new HookNotification.EntityCreateRequest(AtlasHook.getUser(), dbStoreRef, dbRef, hiveTableRef, procRef);
- AtlasHook.notifyEntities(Arrays.asList(message), maxRetries);
- }
- catch(Exception e) {
- throw new AtlasHookException("SqoopHook.publish() failed.", e);
- }
+ return name.toString();
}
}