You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@atlas.apache.org by ma...@apache.org on 2018/01/29 23:40:22 UTC
atlas git commit: ATLAS-2421: updated Atlas notificaiton module to
support V2 data structures; updated HBase hook to use V2 notifications
Repository: atlas
Updated Branches:
refs/heads/master dcdd3d686 -> a02be15d4
ATLAS-2421: updated Atlas notificaiton module to support V2 data structures; updated HBase hook to use V2 notifications
(cherry picked from commit 86c9b19316245c47301e5212552c35d362479fc7)
Project: http://git-wip-us.apache.org/repos/asf/atlas/repo
Commit: http://git-wip-us.apache.org/repos/asf/atlas/commit/a02be15d
Tree: http://git-wip-us.apache.org/repos/asf/atlas/tree/a02be15d
Diff: http://git-wip-us.apache.org/repos/asf/atlas/diff/a02be15d
Branch: refs/heads/master
Commit: a02be15d490939b6a411fede89cd1b1f44696404
Parents: dcdd3d6
Author: Madhan Neethiraj <ma...@apache.org>
Authored: Fri Jan 26 17:05:16 2018 -0800
Committer: Madhan Neethiraj <ma...@apache.org>
Committed: Mon Jan 29 15:02:23 2018 -0800
----------------------------------------------------------------------
.../atlas/hbase/bridge/HBaseAtlasHook.java | 259 ++++++++++---------
.../apache/atlas/hbase/HBaseAtlasHookIT.java | 87 ++++---
.../java/org/apache/atlas/AtlasClientV2.java | 4 +
.../model/notification/HookNotification.java | 123 ++++++++-
.../org/apache/atlas/type/AtlasTypeUtil.java | 16 ++
.../java/org/apache/atlas/utils/AtlasJson.java | 147 ++++++++---
.../AtlasNotificationMessageDeserializer.java | 2 +-
.../notification/hook/HookNotificationTest.java | 197 +++++++++++++-
.../store/graph/AtlasEntityStore.java | 11 +
.../store/graph/v1/AtlasEntityStoreV1.java | 32 +++
.../notification/NotificationHookConsumer.java | 110 ++++++--
11 files changed, 765 insertions(+), 223 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/atlas/blob/a02be15d/addons/hbase-bridge/src/main/java/org/apache/atlas/hbase/bridge/HBaseAtlasHook.java
----------------------------------------------------------------------
diff --git a/addons/hbase-bridge/src/main/java/org/apache/atlas/hbase/bridge/HBaseAtlasHook.java b/addons/hbase-bridge/src/main/java/org/apache/atlas/hbase/bridge/HBaseAtlasHook.java
index 03e340c..f839f87 100644
--- a/addons/hbase-bridge/src/main/java/org/apache/atlas/hbase/bridge/HBaseAtlasHook.java
+++ b/addons/hbase-bridge/src/main/java/org/apache/atlas/hbase/bridge/HBaseAtlasHook.java
@@ -23,11 +23,15 @@ import org.apache.atlas.AtlasConstants;
import org.apache.atlas.hbase.model.HBaseOperationContext;
import org.apache.atlas.hbase.model.HBaseDataTypes;
import org.apache.atlas.hook.AtlasHook;
+import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.atlas.model.instance.AtlasEntity.AtlasEntitiesWithExtInfo;
+import org.apache.atlas.model.instance.AtlasObjectId;
import org.apache.atlas.model.notification.HookNotification;
-import org.apache.atlas.v1.model.instance.Referenceable;
-import org.apache.atlas.v1.model.notification.HookNotificationV1.EntityCreateRequest;
-import org.apache.atlas.v1.model.notification.HookNotificationV1.EntityDeleteRequest;
-import org.apache.atlas.v1.model.notification.HookNotificationV1.EntityUpdateRequest;
+import org.apache.atlas.model.notification.HookNotification.EntityCreateRequestV2;
+import org.apache.atlas.model.notification.HookNotification.EntityDeleteRequestV2;
+import org.apache.atlas.model.notification.HookNotification.EntityUpdateRequestV2;
+import org.apache.atlas.type.AtlasTypeUtil;
+import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.configuration.Configuration;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.HColumnDescriptor;
@@ -43,6 +47,7 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.Date;
import java.util.List;
import java.util.Map;
@@ -216,113 +221,122 @@ public class HBaseAtlasHook extends AtlasHook {
}
private void createOrUpdateNamespaceInstance(HBaseOperationContext hbaseOperationContext) {
- Referenceable nameSpaceRef = buildNameSpaceRef(hbaseOperationContext);
+ AtlasEntity nameSpace = buildNameSpace(hbaseOperationContext);
switch (hbaseOperationContext.getOperation()) {
case CREATE_NAMESPACE:
- LOG.info("Create NameSpace {}", nameSpaceRef.get(REFERENCEABLE_ATTRIBUTE_NAME));
+ LOG.info("Create NameSpace {}", nameSpace.getAttribute(REFERENCEABLE_ATTRIBUTE_NAME));
- hbaseOperationContext.addMessage(new EntityCreateRequest(hbaseOperationContext.getUser(), nameSpaceRef));
+ hbaseOperationContext.addMessage(new EntityCreateRequestV2(hbaseOperationContext.getUser(), new AtlasEntitiesWithExtInfo(nameSpace)));
break;
case ALTER_NAMESPACE:
- LOG.info("Modify NameSpace {}", nameSpaceRef.get(REFERENCEABLE_ATTRIBUTE_NAME));
+ LOG.info("Modify NameSpace {}", nameSpace.getAttribute(REFERENCEABLE_ATTRIBUTE_NAME));
- hbaseOperationContext.addMessage(new EntityUpdateRequest(hbaseOperationContext.getUser(), nameSpaceRef));
+ hbaseOperationContext.addMessage(new EntityUpdateRequestV2(hbaseOperationContext.getUser(), new AtlasEntitiesWithExtInfo(nameSpace)));
break;
}
}
private void deleteNameSpaceInstance(HBaseOperationContext hbaseOperationContext) {
- String nameSpaceQualifiedName = getNameSpaceQualifiedName(clusterName, hbaseOperationContext.getNameSpace());
+ String nameSpaceQName = getNameSpaceQualifiedName(clusterName, hbaseOperationContext.getNameSpace());
+ AtlasObjectId nameSpaceId = new AtlasObjectId(HBaseDataTypes.HBASE_NAMESPACE.getName(), REFERENCEABLE_ATTRIBUTE_NAME, nameSpaceQName);
- LOG.info("Delete NameSpace {}", nameSpaceQualifiedName);
+ LOG.info("Delete NameSpace {}", nameSpaceQName);
- hbaseOperationContext.addMessage(new EntityDeleteRequest(hbaseOperationContext.getUser(),
- HBaseDataTypes.HBASE_NAMESPACE.getName(),
- REFERENCEABLE_ATTRIBUTE_NAME,
- nameSpaceQualifiedName));
+ hbaseOperationContext.addMessage(new EntityDeleteRequestV2(hbaseOperationContext.getUser(), Collections.singletonList(nameSpaceId)));
}
private void createOrUpdateTableInstance(HBaseOperationContext hbaseOperationContext) {
- Referenceable nameSpaceRef = buildNameSpaceRef(hbaseOperationContext);
- Referenceable tableRef = buildTableRef(hbaseOperationContext, nameSpaceRef);
- List<Referenceable> columnFamilyRef = buildColumnFamiliesRef(hbaseOperationContext, nameSpaceRef, tableRef);
+ AtlasEntity nameSpace = buildNameSpace(hbaseOperationContext);
+ AtlasEntity table = buildTable(hbaseOperationContext, nameSpace);
+ List<AtlasEntity> columnFamilies = buildColumnFamilies(hbaseOperationContext, nameSpace, table);
- tableRef.set(ATTR_COLUMNFAMILIES, columnFamilyRef);
+ table.setAttribute(ATTR_COLUMNFAMILIES, AtlasTypeUtil.getAtlasObjectIds(columnFamilies));
+
+ AtlasEntitiesWithExtInfo entities = new AtlasEntitiesWithExtInfo(table);
+
+ entities.addReferredEntity(nameSpace);
+
+ if (CollectionUtils.isNotEmpty(columnFamilies)) {
+ for (AtlasEntity columnFamily : columnFamilies) {
+ entities.addReferredEntity(columnFamily);
+ }
+ }
switch (hbaseOperationContext.getOperation()) {
case CREATE_TABLE:
- LOG.info("Create Table {}", tableRef.get(REFERENCEABLE_ATTRIBUTE_NAME));
+ LOG.info("Create Table {}", table.getAttribute(REFERENCEABLE_ATTRIBUTE_NAME));
- hbaseOperationContext.addMessage(new EntityCreateRequest(hbaseOperationContext.getUser(), nameSpaceRef, tableRef));
+ hbaseOperationContext.addMessage(new EntityCreateRequestV2(hbaseOperationContext.getUser(), entities));
break;
case ALTER_TABLE:
- LOG.info("Modify Table {}", tableRef.get(REFERENCEABLE_ATTRIBUTE_NAME));
+ LOG.info("Modify Table {}", table.getAttribute(REFERENCEABLE_ATTRIBUTE_NAME));
- hbaseOperationContext.addMessage(new EntityUpdateRequest(hbaseOperationContext.getUser(), nameSpaceRef, tableRef));
+ hbaseOperationContext.addMessage(new EntityUpdateRequestV2(hbaseOperationContext.getUser(), entities));
break;
}
}
private void deleteTableInstance(HBaseOperationContext hbaseOperationContext) {
- TableName tableName = hbaseOperationContext.getTableName();
- String tableNameSpace = tableName.getNamespaceAsString();
+ TableName tableName = hbaseOperationContext.getTableName();
+ String nameSpaceName = tableName.getNamespaceAsString();
- if (tableNameSpace == null) {
- tableNameSpace = tableName.getNameWithNamespaceInclAsString();
+ if (nameSpaceName == null) {
+ nameSpaceName = tableName.getNameWithNamespaceInclAsString();
}
- String tableNameStr = tableName.getNameAsString();
- String tableQualifiedName = getTableQualifiedName(clusterName, tableNameSpace, tableNameStr);
+ String tableNameStr = tableName.getNameAsString();
+ String tableQName = getTableQualifiedName(clusterName, nameSpaceName, tableNameStr);
+ AtlasObjectId tableId = new AtlasObjectId(HBaseDataTypes.HBASE_TABLE.getName(), REFERENCEABLE_ATTRIBUTE_NAME, tableQName);
- LOG.info("Delete Table {}", tableQualifiedName);
+ LOG.info("Delete Table {}", tableQName);
- hbaseOperationContext.addMessage(new EntityDeleteRequest(hbaseOperationContext.getUser(),
- HBaseDataTypes.HBASE_TABLE.getName(),
- REFERENCEABLE_ATTRIBUTE_NAME,
- tableQualifiedName));
+ hbaseOperationContext.addMessage(new EntityDeleteRequestV2(hbaseOperationContext.getUser(), Collections.singletonList(tableId)));
}
private void createOrUpdateColumnFamilyInstance(HBaseOperationContext hbaseOperationContext) {
- Referenceable nameSpaceRef = buildNameSpaceRef(hbaseOperationContext);
- Referenceable tableRef = buildTableRef(hbaseOperationContext, nameSpaceRef);
- Referenceable columnFamilyRef = buildColumnFamilyRef(hbaseOperationContext, hbaseOperationContext.gethColumnDescriptor(), nameSpaceRef, tableRef);
+ AtlasEntity nameSpace = buildNameSpace(hbaseOperationContext);
+ AtlasEntity table = buildTable(hbaseOperationContext, nameSpace);
+ AtlasEntity columnFamily = buildColumnFamily(hbaseOperationContext, hbaseOperationContext.gethColumnDescriptor(), nameSpace, table);
+
+ AtlasEntitiesWithExtInfo entities = new AtlasEntitiesWithExtInfo(columnFamily);
+
+ entities.addReferredEntity(nameSpace);
+ entities.addReferredEntity(table);
switch (hbaseOperationContext.getOperation()) {
case CREATE_COLUMN_FAMILY:
- LOG.info("Create ColumnFamily {}", columnFamilyRef.get(REFERENCEABLE_ATTRIBUTE_NAME));
+ LOG.info("Create ColumnFamily {}", columnFamily.getAttribute(REFERENCEABLE_ATTRIBUTE_NAME));
- hbaseOperationContext.addMessage(new EntityCreateRequest(hbaseOperationContext.getUser(), nameSpaceRef, tableRef, columnFamilyRef));
+ hbaseOperationContext.addMessage(new EntityCreateRequestV2(hbaseOperationContext.getUser(), entities));
break;
case ALTER_COLUMN_FAMILY:
- LOG.info("Alter ColumnFamily {}", columnFamilyRef.get(REFERENCEABLE_ATTRIBUTE_NAME));
+ LOG.info("Alter ColumnFamily {}", columnFamily.getAttribute(REFERENCEABLE_ATTRIBUTE_NAME));
- hbaseOperationContext.addMessage(new EntityUpdateRequest(hbaseOperationContext.getUser(), nameSpaceRef, tableRef, columnFamilyRef));
+ hbaseOperationContext.addMessage(new EntityUpdateRequestV2(hbaseOperationContext.getUser(), entities));
break;
}
}
private void deleteColumnFamilyInstance(HBaseOperationContext hbaseOperationContext) {
- TableName tableName = hbaseOperationContext.getTableName();
- String tableNameSpace = tableName.getNamespaceAsString();
+ TableName tableName = hbaseOperationContext.getTableName();
+ String nameSpaceName = tableName.getNamespaceAsString();
- if (tableNameSpace == null) {
- tableNameSpace = tableName.getNameWithNamespaceInclAsString();
+ if (nameSpaceName == null) {
+ nameSpaceName = tableName.getNameWithNamespaceInclAsString();
}
- String tableNameStr = tableName.getNameAsString();
- String columnFamilyName = hbaseOperationContext.getColummFamily();
- String columnFamilyQualifiedName = getColumnFamilyQualifiedName(clusterName, tableNameSpace, tableNameStr, columnFamilyName);
+ String tableNameStr = tableName.getNameAsString();
+ String columnFamilyName = hbaseOperationContext.getColummFamily();
+ String columnFamilyQName = getColumnFamilyQualifiedName(clusterName, nameSpaceName, tableNameStr, columnFamilyName);
+ AtlasObjectId columnFamilyId = new AtlasObjectId(HBaseDataTypes.HBASE_COLUMN_FAMILY.getName(), REFERENCEABLE_ATTRIBUTE_NAME, columnFamilyQName);
- LOG.info("Delete ColumnFamily {}", columnFamilyQualifiedName);
+ LOG.info("Delete ColumnFamily {}", columnFamilyQName);
- hbaseOperationContext.addMessage(new EntityDeleteRequest(hbaseOperationContext.getUser(),
- HBaseDataTypes.HBASE_COLUMN_FAMILY.getName(),
- REFERENCEABLE_ATTRIBUTE_NAME,
- columnFamilyQualifiedName));
+ hbaseOperationContext.addMessage(new EntityDeleteRequestV2(hbaseOperationContext.getUser(), Collections.singletonList(columnFamilyId)));
}
@@ -366,127 +380,123 @@ public class HBaseAtlasHook extends AtlasHook {
return tableName.substring(tableName.indexOf(":") + 1);
}
- private Referenceable buildNameSpaceRef(HBaseOperationContext hbaseOperationContext) {
- Referenceable nameSpaceRef = new Referenceable(HBaseDataTypes.HBASE_NAMESPACE.getName());
-
- String nameSpace = null;
-
+ private AtlasEntity buildNameSpace(HBaseOperationContext hbaseOperationContext) {
+ AtlasEntity nameSpace = new AtlasEntity(HBaseDataTypes.HBASE_NAMESPACE.getName());
NamespaceDescriptor nameSpaceDesc = hbaseOperationContext.getNamespaceDescriptor();
+ String nameSpaceName = nameSpaceDesc == null ? null : hbaseOperationContext.getNamespaceDescriptor().getName();
- if (nameSpaceDesc != null) {
- nameSpace = hbaseOperationContext.getNamespaceDescriptor().getName();
- }
-
- if (nameSpace == null) {
- nameSpace = hbaseOperationContext.getNameSpace();
+ if (nameSpaceName == null) {
+ nameSpaceName = hbaseOperationContext.getNameSpace();
}
- nameSpaceRef.set(ATTR_NAME, nameSpace);
- nameSpaceRef.set(REFERENCEABLE_ATTRIBUTE_NAME, getNameSpaceQualifiedName(clusterName, nameSpace));
- nameSpaceRef.set(AtlasConstants.CLUSTER_NAME_ATTRIBUTE, clusterName);
- nameSpaceRef.set(ATTR_DESCRIPTION, nameSpace);
- nameSpaceRef.set(ATTR_PARAMETERS, hbaseOperationContext.getHbaseConf());
- nameSpaceRef.set(ATTR_OWNER, hbaseOperationContext.getOwner());
-
Date now = new Date(System.currentTimeMillis());
+ nameSpace.setAttribute(ATTR_NAME, nameSpaceName);
+ nameSpace.setAttribute(REFERENCEABLE_ATTRIBUTE_NAME, getNameSpaceQualifiedName(clusterName, nameSpaceName));
+ nameSpace.setAttribute(AtlasConstants.CLUSTER_NAME_ATTRIBUTE, clusterName);
+ nameSpace.setAttribute(ATTR_DESCRIPTION, nameSpaceName);
+ nameSpace.setAttribute(ATTR_PARAMETERS, hbaseOperationContext.getHbaseConf());
+ nameSpace.setAttribute(ATTR_OWNER, hbaseOperationContext.getOwner());
+ nameSpace.setAttribute(ATTR_MODIFIED_TIME, now);
+
if (OPERATION.CREATE_NAMESPACE.equals(hbaseOperationContext.getOperation())) {
- nameSpaceRef.set(ATTR_CREATE_TIME, now);
- nameSpaceRef.set(ATTR_MODIFIED_TIME, now);
- } else {
- nameSpaceRef.set(ATTR_MODIFIED_TIME, now);
+ nameSpace.setAttribute(ATTR_CREATE_TIME, now);
}
- return nameSpaceRef;
+ return nameSpace;
}
- private Referenceable buildTableRef(HBaseOperationContext hbaseOperationContext, Referenceable nameSpaceRef) {
- Referenceable tableRef = new Referenceable(HBaseDataTypes.HBASE_TABLE.getName());
- String tableName = getTableName(hbaseOperationContext);
- String tableNameSpace = hbaseOperationContext.getNameSpace();
- String tableQualifiedName = getTableQualifiedName(clusterName, tableNameSpace, tableName);
- OPERATION operation = hbaseOperationContext.getOperation();
- Date now = new Date(System.currentTimeMillis());
+ private AtlasEntity buildTable(HBaseOperationContext hbaseOperationContext, AtlasEntity nameSpace) {
+ AtlasEntity table = new AtlasEntity(HBaseDataTypes.HBASE_TABLE.getName());
+ String tableName = getTableName(hbaseOperationContext);
+ String nameSpaceName = (String) nameSpace.getAttribute(ATTR_NAME);
+ String tableQName = getTableQualifiedName(clusterName, nameSpaceName, tableName);
+ OPERATION operation = hbaseOperationContext.getOperation();
+ Date now = new Date(System.currentTimeMillis());
- tableRef.set(REFERENCEABLE_ATTRIBUTE_NAME, tableQualifiedName);
- tableRef.set(ATTR_NAME, tableName);
- tableRef.set(ATTR_URI, tableName);
- tableRef.set(ATTR_OWNER, hbaseOperationContext.getOwner());
- tableRef.set(ATTR_DESCRIPTION, tableName);
- tableRef.set(ATTR_PARAMETERS, hbaseOperationContext.getHbaseConf());
+ table.setAttribute(REFERENCEABLE_ATTRIBUTE_NAME, tableQName);
+ table.setAttribute(ATTR_NAME, tableName);
+ table.setAttribute(ATTR_URI, tableName);
+ table.setAttribute(ATTR_OWNER, hbaseOperationContext.getOwner());
+ table.setAttribute(ATTR_DESCRIPTION, tableName);
+ table.setAttribute(ATTR_PARAMETERS, hbaseOperationContext.getHbaseConf());
+ table.setAttribute(ATTR_NAMESPACE, AtlasTypeUtil.getAtlasObjectId(nameSpace));
switch (operation) {
case CREATE_TABLE:
- tableRef.set(ATTR_NAMESPACE, nameSpaceRef);
- tableRef.set(ATTR_CREATE_TIME, now);
- tableRef.set(ATTR_MODIFIED_TIME, now);
+ table.setAttribute(ATTR_CREATE_TIME, now);
+ table.setAttribute(ATTR_MODIFIED_TIME, now);
break;
case ALTER_TABLE:
- tableRef.set(ATTR_NAMESPACE, nameSpaceRef);
- tableRef.set(ATTR_MODIFIED_TIME, now);
+ table.setAttribute(ATTR_MODIFIED_TIME, now);
break;
default:
- tableRef.set(ATTR_NAMESPACE, nameSpaceRef.getId());
break;
}
- return tableRef;
+ return table;
}
- private List<Referenceable> buildColumnFamiliesRef(HBaseOperationContext hbaseOperationContext, Referenceable nameSpaceRef, Referenceable tableRef) {
- List<Referenceable> entities = new ArrayList<>();
-
+ private List<AtlasEntity> buildColumnFamilies(HBaseOperationContext hbaseOperationContext, AtlasEntity nameSpace, AtlasEntity table) {
+ List<AtlasEntity> columnFamilies = new ArrayList<>();
HColumnDescriptor[] hColumnDescriptors = hbaseOperationContext.gethColumnDescriptors();
if (hColumnDescriptors != null) {
for (HColumnDescriptor hColumnDescriptor : hColumnDescriptors) {
- Referenceable columnFamilyRef = buildColumnFamilyRef(hbaseOperationContext, hColumnDescriptor, nameSpaceRef, tableRef);
+ AtlasEntity columnFamily = buildColumnFamily(hbaseOperationContext, hColumnDescriptor, nameSpace, table);
- entities.add(columnFamilyRef);
+ columnFamilies.add(columnFamily);
}
}
- return entities;
+ return columnFamilies;
}
- private Referenceable buildColumnFamilyRef(HBaseOperationContext hbaseOperationContext, HColumnDescriptor hColumnDescriptor, Referenceable nameSpaceRef, Referenceable tableReference) {
- Referenceable columnFamilyRef = new Referenceable(HBaseDataTypes.HBASE_COLUMN_FAMILY.getName());
- String columnFamilyName = hColumnDescriptor.getNameAsString();
- String tableName = (String) tableReference.get(ATTR_NAME);
- String namespace = (String) nameSpaceRef.get(ATTR_NAME);
+ private AtlasEntity buildColumnFamily(HBaseOperationContext hbaseOperationContext, HColumnDescriptor hColumnDescriptor, AtlasEntity nameSpace, AtlasEntity table) {
+ AtlasEntity columnFamily = new AtlasEntity(HBaseDataTypes.HBASE_COLUMN_FAMILY.getName());
+ String columnFamilyName = hColumnDescriptor.getNameAsString();
+ String tableName = (String) table.getAttribute(ATTR_NAME);
+ String nameSpaceName = (String) nameSpace.getAttribute(ATTR_NAME);
+ String columnFamilyQName = getColumnFamilyQualifiedName(clusterName, nameSpaceName, tableName, columnFamilyName);
+ Date now = new Date(System.currentTimeMillis());
- String columnFamilyQualifiedName = getColumnFamilyQualifiedName(clusterName, namespace, tableName, columnFamilyName);
-
- columnFamilyRef.set(ATTR_NAME, columnFamilyName);
- columnFamilyRef.set(ATTR_DESCRIPTION, columnFamilyName);
- columnFamilyRef.set(REFERENCEABLE_ATTRIBUTE_NAME, columnFamilyQualifiedName);
- columnFamilyRef.set(ATTR_OWNER, hbaseOperationContext.getOwner());
-
- Date now = new Date(System.currentTimeMillis());
+ columnFamily.setAttribute(ATTR_NAME, columnFamilyName);
+ columnFamily.setAttribute(ATTR_DESCRIPTION, columnFamilyName);
+ columnFamily.setAttribute(REFERENCEABLE_ATTRIBUTE_NAME, columnFamilyQName);
+ columnFamily.setAttribute(ATTR_OWNER, hbaseOperationContext.getOwner());
+ columnFamily.setAttribute(ATTR_TABLE, AtlasTypeUtil.getAtlasObjectId(table));
switch (hbaseOperationContext.getOperation()) {
case CREATE_COLUMN_FAMILY:
- columnFamilyRef.set(ATTR_TABLE, tableReference);
- columnFamilyRef.set(ATTR_CREATE_TIME, now);
- columnFamilyRef.set(ATTR_MODIFIED_TIME, now);
+ columnFamily.setAttribute(ATTR_CREATE_TIME, now);
+ columnFamily.setAttribute(ATTR_MODIFIED_TIME, now);
break;
case ALTER_COLUMN_FAMILY:
- columnFamilyRef.set(ATTR_TABLE, tableReference);
- columnFamilyRef.set(ATTR_MODIFIED_TIME, now);
+ columnFamily.setAttribute(ATTR_MODIFIED_TIME, now);
break;
default:
- columnFamilyRef.set(ATTR_TABLE, tableReference.getId());
+ break;
}
- return columnFamilyRef;
+ return columnFamily;
}
private String getTableName(HBaseOperationContext hbaseOperationContext) {
- HTableDescriptor tableDescriptor = hbaseOperationContext.gethTableDescriptor();
+ final String ret;
- return (tableDescriptor != null) ? tableDescriptor.getNameAsString() : null;
+ TableName tableName = hbaseOperationContext.getTableName();
+
+ if (tableName != null) {
+ ret = tableName.getNameAsString();
+ } else {
+ HTableDescriptor tableDescriptor = hbaseOperationContext.gethTableDescriptor();
+
+ ret = (tableDescriptor != null) ? tableDescriptor.getNameAsString() : null;
+ }
+
+ return ret;
}
private void notifyAsPrivilegedAction(final HBaseOperationContext hbaseOperationContext) {
@@ -496,7 +506,6 @@ public class HBaseAtlasHook extends AtlasHook {
final List<HookNotification> messages = hbaseOperationContext.getMessages();
-
try {
PrivilegedExceptionAction<Object> privilegedNotify = new PrivilegedExceptionAction<Object>() {
@Override
http://git-wip-us.apache.org/repos/asf/atlas/blob/a02be15d/addons/hbase-bridge/src/test/java/org/apache/atlas/hbase/HBaseAtlasHookIT.java
----------------------------------------------------------------------
diff --git a/addons/hbase-bridge/src/test/java/org/apache/atlas/hbase/HBaseAtlasHookIT.java b/addons/hbase-bridge/src/test/java/org/apache/atlas/hbase/HBaseAtlasHookIT.java
index 0d2e8df..db5c78e 100644
--- a/addons/hbase-bridge/src/test/java/org/apache/atlas/hbase/HBaseAtlasHookIT.java
+++ b/addons/hbase-bridge/src/test/java/org/apache/atlas/hbase/HBaseAtlasHookIT.java
@@ -20,9 +20,11 @@ package org.apache.atlas.hbase;
import org.apache.atlas.ApplicationProperties;
import org.apache.atlas.AtlasClient;
+import org.apache.atlas.AtlasClientV2;
import org.apache.atlas.hbase.bridge.HBaseAtlasHook;
import org.apache.atlas.hbase.model.HBaseDataTypes;
-import org.apache.atlas.v1.model.instance.Referenceable;
+import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo;
import org.apache.atlas.utils.AuthenticationUtil;
import org.apache.atlas.utils.ParamChecker;
import org.apache.hadoop.conf.Configuration;
@@ -40,6 +42,7 @@ import org.testng.annotations.Test;
import java.io.IOException;
import java.net.ServerSocket;
+import java.util.Collections;
import java.util.Iterator;
import static org.testng.Assert.assertNotNull;
@@ -47,12 +50,13 @@ import static org.testng.Assert.fail;
public class HBaseAtlasHookIT {
- private static final Logger LOG = LoggerFactory.getLogger(HBaseAtlasHookIT.class);
- protected static final String DGI_URL = "http://localhost:31000/";
+ private static final Logger LOG = LoggerFactory.getLogger(HBaseAtlasHookIT.class);
+ protected static final String ATLAS_URL = "http://localhost:31000/";
protected static final String CLUSTER_NAME = "primary";
- private static HBaseTestingUtility utility;
- private static int port;
- private static AtlasClient atlasClient;
+
+ private HBaseTestingUtility utility;
+ private int port;
+ private AtlasClientV2 atlasClient;
@BeforeClass
@@ -65,36 +69,42 @@ public class HBaseAtlasHookIT {
}
}
-
@AfterClass
- public static void cleanup() throws Exception {
- LOG.info(" Stopping mini cluster.. ");
+ public void cleanup() throws Exception {
+ LOG.info("Stopping mini cluster.. ");
utility.shutdownMiniCluster();
}
@Test
public void testCreateNamesapce() throws Exception {
final Configuration conf = HBaseConfiguration.create();
+
conf.set("hbase.zookeeper.quorum", "localhost");
conf.set("hbase.zookeeper.property.clientPort", String.valueOf(port));
conf.set("zookeeper.znode.parent", "/hbase-unsecure");
+
Connection conn = ConnectionFactory.createConnection(conf);
Admin admin = conn.getAdmin();
NamespaceDescriptor ns = NamespaceDescriptor.create("test_namespace").build();
+
admin.createNamespace(ns);
- String nameSpace = assertNameSpaceIsRegistered(ns.getName());
+
//assert on qualified name
- Referenceable nameSpaceRef = getAtlasClient().getEntity(nameSpace);
- String nameSpaceQualifiedName = HBaseAtlasHook.getNameSpaceQualifiedName(CLUSTER_NAME, ns.getName());
- Assert.assertEquals(nameSpaceRef.get(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME), nameSpaceQualifiedName);
+ String nameSpace = assertNameSpaceIsRegistered(ns.getName());
+ AtlasEntityWithExtInfo nameSpaceRef = getAtlasClient().getEntityByGuid(nameSpace);
+ String nameSpaceQualifiedName = HBaseAtlasHook.getNameSpaceQualifiedName(CLUSTER_NAME, ns.getName());
+
+ Assert.assertEquals(nameSpaceRef.getEntity().getAttribute(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME), nameSpaceQualifiedName);
}
@Test
public void testCreateTable() throws Exception {
final Configuration conf = HBaseConfiguration.create();
+
conf.set("hbase.zookeeper.quorum", "localhost");
conf.set("hbase.zookeeper.property.clientPort", String.valueOf(port));
conf.set("zookeeper.znode.parent", "/hbase-unsecure");
+
Connection conn = ConnectionFactory.createConnection(conf);
Admin admin = conn.getAdmin();
String namespace = "test_namespace1";
@@ -103,27 +113,35 @@ public class HBaseAtlasHookIT {
// Create a table
if (!admin.tableExists(TableName.valueOf(namespace, tablename))) {
NamespaceDescriptor ns = NamespaceDescriptor.create(namespace).build();
+
admin.createNamespace(ns);
+
HTableDescriptor tableDescriptor = new HTableDescriptor(TableName.valueOf(namespace, tablename));
+
tableDescriptor.addFamily(new HColumnDescriptor("colfam1"));
+
admin.createTable(tableDescriptor);
}
- String table = assertTableIsRegistered(namespace, tablename);
+
//assert on qualified name
- Referenceable tableRef = getAtlasClient().getEntity(table);
- String entityName = HBaseAtlasHook.getTableQualifiedName(CLUSTER_NAME, namespace, tablename);
- Assert.assertEquals(tableRef.get(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME), entityName);
+ String table = assertTableIsRegistered(namespace, tablename);
+ AtlasEntityWithExtInfo tableRef = getAtlasClient().getEntityByGuid(table);
+ String entityName = HBaseAtlasHook.getTableQualifiedName(CLUSTER_NAME, namespace, tablename);
+
+ Assert.assertEquals(tableRef.getEntity().getAttribute(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME), entityName);
}
// Methods for creating HBase
- public static void createAtlasClient() {
+ private void createAtlasClient() {
try {
org.apache.commons.configuration.Configuration configuration = ApplicationProperties.get();
String[] atlasEndPoint = configuration.getStringArray(HBaseAtlasHook.ATTR_ATLAS_ENDPOINT);
+
configuration.setProperty("atlas.cluster.name", CLUSTER_NAME);
+
if (atlasEndPoint == null || atlasEndPoint.length == 0) {
- atlasEndPoint = new String[]{DGI_URL};
+ atlasEndPoint = new String[]{ATLAS_URL};
}
Iterator<String> keys = configuration.getKeys();
@@ -133,12 +151,10 @@ public class HBaseAtlasHookIT {
}
if (AuthenticationUtil.isKerberosAuthenticationEnabled()) {
- atlasClient = new AtlasClient(configuration, atlasEndPoint);
+ atlasClient = new AtlasClientV2(configuration, atlasEndPoint, null);
} else {
- atlasClient = new AtlasClient(configuration, atlasEndPoint, new String[]{"admin", "admin"});
+ atlasClient = new AtlasClientV2(configuration, atlasEndPoint, new String[]{"admin", "admin"});
}
-
-
} catch (Exception e) {
LOG.error("Unable to create AtlasClient for Testing ", e);
}
@@ -147,15 +163,18 @@ public class HBaseAtlasHookIT {
private static int getFreePort() throws IOException {
ServerSocket serverSocket = new ServerSocket(0);
int port = serverSocket.getLocalPort();
+
serverSocket.close();
+
return port;
}
- public static void createHBaseCluster() throws Exception {
+ private void createHBaseCluster() throws Exception {
LOG.info("Creating Hbase Admin...");
- port = getFreePort();
+ port = getFreePort();
utility = new HBaseTestingUtility();
+
utility.getConfiguration().set("test.hbase.zookeeper.property.clientPort", String.valueOf(port));
utility.getConfiguration().set("hbase.master.port", String.valueOf(getFreePort()));
utility.getConfiguration().set("hbase.master.info.port", String.valueOf(getFreePort()));
@@ -170,8 +189,8 @@ public class HBaseAtlasHookIT {
}
- public AtlasClient getAtlasClient() {
- AtlasClient ret = null;
+ public AtlasClientV2 getAtlasClient() {
+ AtlasClientV2 ret = null;
if (atlasClient != null) {
ret = atlasClient;
}
@@ -205,7 +224,7 @@ public class HBaseAtlasHookIT {
}
public interface AssertPredicate {
- void assertOnEntity(Referenceable entity) throws Exception;
+ void assertOnEntity(AtlasEntity entity) throws Exception;
}
public interface Predicate {
@@ -224,15 +243,19 @@ public class HBaseAtlasHookIT {
waitFor(80000, new HBaseAtlasHookIT.Predicate() {
@Override
public void evaluate() throws Exception {
- Referenceable entity = atlasClient.getEntity(typeName, property, value);
+ AtlasEntityWithExtInfo entity = atlasClient.getEntityByAttribute(typeName, Collections.singletonMap(property, value));
+
assertNotNull(entity);
+
if (assertPredicate != null) {
- assertPredicate.assertOnEntity(entity);
+ assertPredicate.assertOnEntity(entity.getEntity());
}
}
});
- Referenceable entity = atlasClient.getEntity(typeName, property, value);
- return entity.getId()._getId();
+
+ AtlasEntityWithExtInfo entity = atlasClient.getEntityByAttribute(typeName, Collections.singletonMap(property, value));
+
+ return entity.getEntity().getGuid();
}
/**
http://git-wip-us.apache.org/repos/asf/atlas/blob/a02be15d/client/client-v2/src/main/java/org/apache/atlas/AtlasClientV2.java
----------------------------------------------------------------------
diff --git a/client/client-v2/src/main/java/org/apache/atlas/AtlasClientV2.java b/client/client-v2/src/main/java/org/apache/atlas/AtlasClientV2.java
index 42003bc..92ad923 100644
--- a/client/client-v2/src/main/java/org/apache/atlas/AtlasClientV2.java
+++ b/client/client-v2/src/main/java/org/apache/atlas/AtlasClientV2.java
@@ -106,6 +106,10 @@ public class AtlasClientV2 extends AtlasBaseClient {
super(baseUrl, cookie);
}
+ public AtlasClientV2(Configuration configuration, String[] baseUrl, String[] basicAuthUserNamePassword) {
+ super(configuration, baseUrl, basicAuthUserNamePassword);
+ }
+
@VisibleForTesting
AtlasClientV2(WebResource service, Configuration configuration) {
super(service, configuration);
http://git-wip-us.apache.org/repos/asf/atlas/blob/a02be15d/intg/src/main/java/org/apache/atlas/model/notification/HookNotification.java
----------------------------------------------------------------------
diff --git a/intg/src/main/java/org/apache/atlas/model/notification/HookNotification.java b/intg/src/main/java/org/apache/atlas/model/notification/HookNotification.java
index 83e52d5..5b5fa04 100644
--- a/intg/src/main/java/org/apache/atlas/model/notification/HookNotification.java
+++ b/intg/src/main/java/org/apache/atlas/model/notification/HookNotification.java
@@ -21,12 +21,18 @@ import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
+import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo;
+import org.apache.atlas.model.instance.AtlasEntity.AtlasEntitiesWithExtInfo;
+import org.apache.atlas.model.instance.AtlasObjectId;
+
import org.apache.commons.lang.StringUtils;
import javax.xml.bind.annotation.XmlAccessType;
import javax.xml.bind.annotation.XmlAccessorType;
import javax.xml.bind.annotation.XmlRootElement;
import java.io.Serializable;
+import java.util.List;
import static com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility.NONE;
import static com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility.PUBLIC_ONLY;
@@ -48,7 +54,8 @@ public class HookNotification implements Serializable {
* Type of the hook message.
*/
public enum HookNotificationType {
- TYPE_CREATE, TYPE_UPDATE, ENTITY_CREATE, ENTITY_PARTIAL_UPDATE, ENTITY_FULL_UPDATE, ENTITY_DELETE
+ TYPE_CREATE, TYPE_UPDATE, ENTITY_CREATE, ENTITY_PARTIAL_UPDATE, ENTITY_FULL_UPDATE, ENTITY_DELETE,
+ ENTITY_CREATE_V2, ENTITY_PARTIAL_UPDATE_V2, ENTITY_FULL_UPDATE_V2, ENTITY_DELETE_V2
}
protected HookNotificationType type;
@@ -101,4 +108,118 @@ public class HookNotification implements Serializable {
return sb;
}
+
+ @JsonAutoDetect(getterVisibility=PUBLIC_ONLY, setterVisibility=PUBLIC_ONLY, fieldVisibility=NONE)
+ @JsonSerialize(include=JsonSerialize.Inclusion.ALWAYS)
+ @JsonIgnoreProperties(ignoreUnknown=true)
+ @XmlRootElement
+ @XmlAccessorType(XmlAccessType.PROPERTY)
+ public static class EntityCreateRequestV2 extends HookNotification implements Serializable {
+ private AtlasEntitiesWithExtInfo entities;
+
+ private EntityCreateRequestV2() {
+ }
+
+ public EntityCreateRequestV2(String user, AtlasEntitiesWithExtInfo entities) {
+ super(HookNotificationType.ENTITY_CREATE_V2, user);
+
+ this.entities = entities;
+ }
+
+ public AtlasEntitiesWithExtInfo getEntities() {
+ return entities;
+ }
+
+ @Override
+ public String toString() {
+ return entities == null ? "null" : entities.toString();
+ }
+ }
+
+ @JsonAutoDetect(getterVisibility=PUBLIC_ONLY, setterVisibility=PUBLIC_ONLY, fieldVisibility=NONE)
+ @JsonSerialize(include=JsonSerialize.Inclusion.ALWAYS)
+ @JsonIgnoreProperties(ignoreUnknown=true)
+ @XmlRootElement
+ @XmlAccessorType(XmlAccessType.PROPERTY)
+ public static class EntityUpdateRequestV2 extends HookNotification implements Serializable {
+ private AtlasEntitiesWithExtInfo entities;
+
+ private EntityUpdateRequestV2() {
+ }
+
+ public EntityUpdateRequestV2(String user, AtlasEntitiesWithExtInfo entities) {
+ super(HookNotificationType.ENTITY_FULL_UPDATE_V2, user);
+
+ this.entities = entities;
+ }
+
+ public AtlasEntitiesWithExtInfo getEntities() {
+ return entities;
+ }
+
+ @Override
+ public String toString() {
+ return entities == null ? "null" : entities.toString();
+ }
+ }
+
+ @JsonAutoDetect(getterVisibility=PUBLIC_ONLY, setterVisibility=PUBLIC_ONLY, fieldVisibility=NONE)
+ @JsonSerialize(include=JsonSerialize.Inclusion.ALWAYS)
+ @JsonIgnoreProperties(ignoreUnknown=true)
+ @XmlRootElement
+ @XmlAccessorType(XmlAccessType.PROPERTY)
+ public static class EntityPartialUpdateRequestV2 extends HookNotification implements Serializable {
+ private AtlasObjectId entityId;
+ private AtlasEntityWithExtInfo entity;
+
+ private EntityPartialUpdateRequestV2() {
+ }
+
+ public EntityPartialUpdateRequestV2(String user, AtlasObjectId entityId, AtlasEntityWithExtInfo entity) {
+ super(HookNotificationType.ENTITY_PARTIAL_UPDATE_V2, user);
+
+ this.entityId = entityId;
+ this.entity = entity;
+ }
+
+ public AtlasObjectId getEntityId() {
+ return entityId;
+ }
+
+ public AtlasEntityWithExtInfo getEntity() {
+ return entity;
+ }
+
+ @Override
+ public String toString() {
+ return "entityId=" + entityId + "; entity=" + entity;
+ }
+ }
+
+ @JsonAutoDetect(getterVisibility=PUBLIC_ONLY, setterVisibility=PUBLIC_ONLY, fieldVisibility=NONE)
+ @JsonSerialize(include=JsonSerialize.Inclusion.ALWAYS)
+ @JsonIgnoreProperties(ignoreUnknown=true)
+ @XmlRootElement
+ @XmlAccessorType(XmlAccessType.PROPERTY)
+ public static class EntityDeleteRequestV2 extends HookNotification implements Serializable {
+ private List<AtlasObjectId> entities;
+
+ private EntityDeleteRequestV2() {
+ }
+
+ public EntityDeleteRequestV2(String user, List<AtlasObjectId> entities) {
+ super(HookNotificationType.ENTITY_DELETE_V2, user);
+
+ this.entities = entities;
+ }
+
+ public List<AtlasObjectId> getEntities() {
+ return entities;
+ }
+
+ @Override
+ public String toString() {
+ return entities == null ? "null" : entities.toString();
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/atlas/blob/a02be15d/intg/src/main/java/org/apache/atlas/type/AtlasTypeUtil.java
----------------------------------------------------------------------
diff --git a/intg/src/main/java/org/apache/atlas/type/AtlasTypeUtil.java b/intg/src/main/java/org/apache/atlas/type/AtlasTypeUtil.java
index 7b77a73..1b6af94 100644
--- a/intg/src/main/java/org/apache/atlas/type/AtlasTypeUtil.java
+++ b/intg/src/main/java/org/apache/atlas/type/AtlasTypeUtil.java
@@ -347,6 +347,22 @@ public class AtlasTypeUtil {
return new AtlasObjectId(header.getGuid(), header.getTypeName());
}
+ public static List<AtlasObjectId> getAtlasObjectIds(List<AtlasEntity> entities) {
+ final List<AtlasObjectId> ret;
+
+ if (CollectionUtils.isNotEmpty(entities)) {
+ ret = new ArrayList<>(entities.size());
+
+ for (AtlasEntity entity : entities) {
+ ret.add(getAtlasObjectId(entity));
+ }
+ } else {
+ ret = new ArrayList<>();
+ }
+
+ return ret;
+ }
+
public static boolean isValidGuid(AtlasObjectId objId) {
return isValidGuid(objId.getGuid());
}
http://git-wip-us.apache.org/repos/asf/atlas/blob/a02be15d/intg/src/main/java/org/apache/atlas/utils/AtlasJson.java
----------------------------------------------------------------------
diff --git a/intg/src/main/java/org/apache/atlas/utils/AtlasJson.java b/intg/src/main/java/org/apache/atlas/utils/AtlasJson.java
index adf0665..4f7b716 100644
--- a/intg/src/main/java/org/apache/atlas/utils/AtlasJson.java
+++ b/intg/src/main/java/org/apache/atlas/utils/AtlasJson.java
@@ -29,6 +29,10 @@ import org.apache.atlas.model.notification.EntityNotification;
import org.apache.atlas.model.notification.EntityNotification.EntityNotificationType;
import org.apache.atlas.model.notification.HookNotification;
import org.apache.atlas.model.notification.HookNotification.HookNotificationType;
+import org.apache.atlas.model.notification.HookNotification.EntityCreateRequestV2;
+import org.apache.atlas.model.notification.HookNotification.EntityDeleteRequestV2;
+import org.apache.atlas.model.notification.HookNotification.EntityPartialUpdateRequestV2;
+import org.apache.atlas.model.notification.HookNotification.EntityUpdateRequestV2;
import org.apache.atlas.model.typedef.AtlasBaseTypeDef;
import org.apache.atlas.v1.model.instance.AtlasSystemAttributes;
import org.apache.atlas.v1.model.instance.Id;
@@ -65,12 +69,23 @@ public class AtlasJson {
static {
SimpleModule atlasSerDeModule = new SimpleModule("AtlasSerDe", new Version(1, 0, 0, null));
- atlasSerDeModule.addSerializer(Date.class, new DateSerializer());
- atlasSerDeModule.addDeserializer(Date.class, new DateDeserializer());
+ atlasSerDeModule.addSerializer(Referenceable.class, new ReferenceableSerializer());
+ atlasSerDeModule.addDeserializer(Referenceable.class, new ReferenceableDeserializer());
+ atlasSerDeModule.addSerializer(Struct.class, new StructSerializer());
+ atlasSerDeModule.addDeserializer(Struct.class, new StructDeserializer());
+ atlasSerDeModule.addSerializer(Id.class, new IdSerializer());
+ atlasSerDeModule.addDeserializer(Id.class, new IdDeserializer());
atlasSerDeModule.addDeserializer(HookNotification.class, new HookNotificationDeserializer());
atlasSerDeModule.addDeserializer(EntityNotification.class, new EntityNotificationDeserializer());
- mapperV1.registerModule(atlasSerDeModule);
+ mapper.registerModule(atlasSerDeModule);
+
+ SimpleModule atlasSerDeV1Module = new SimpleModule("AtlasSerDeV1", new Version(1, 0, 0, null));
+
+ atlasSerDeV1Module.addSerializer(Date.class, new DateSerializer());
+ atlasSerDeV1Module.addDeserializer(Date.class, new DateDeserializer());
+
+ mapperV1.registerModule(atlasSerDeV1Module);
SimpleModule searchResultV1SerDeModule = new SimpleModule("SearchResultV1SerDe", new Version(1, 0, 0, null));
@@ -90,7 +105,7 @@ public class AtlasJson {
if (obj instanceof JsonNode && ((JsonNode) obj).isTextual()) {
ret = ((JsonNode) obj).textValue();
} else {
- ret = mapperV1.writeValueAsString(obj);
+ ret = mapper.writeValueAsString(obj);
}
}catch (IOException e){
LOG.error("AtlasJson.toJson()", e);
@@ -106,6 +121,10 @@ public class AtlasJson {
if (jsonStr != null) {
try {
ret = mapper.readValue(jsonStr, type);
+
+ if (ret instanceof Struct) {
+ ((Struct) ret).normalize();
+ }
} catch (IOException e) {
LOG.error("AtlasType.fromJson()", e);
@@ -116,34 +135,18 @@ public class AtlasJson {
return ret;
}
- public static String toV1Json(Object obj) {
- String ret;
- try {
- if (obj instanceof JsonNode && ((JsonNode) obj).isTextual()) {
- ret = ((JsonNode) obj).textValue();
- } else {
- ret = mapperV1.writeValueAsString(obj);
- }
- } catch (IOException e) {
- LOG.error("AtlasType.toV1Json()", e);
-
- ret = null;
- }
- return ret;
- }
-
- public static <T> T fromV1Json(String jsonStr, Class<T> type) {
+ public static <T> T fromJson(String jsonStr, TypeReference<T> type) {
T ret = null;
if (jsonStr != null) {
try {
- ret = mapperV1.readValue(jsonStr, type);
+ ret = mapper.readValue(jsonStr, type);
if (ret instanceof Struct) {
((Struct) ret).normalize();
}
} catch (IOException e) {
- LOG.error("AtlasType.fromV1Json()", e);
+ LOG.error("AtlasType.fromJson()", e);
ret = null;
}
@@ -152,20 +155,16 @@ public class AtlasJson {
return ret;
}
- public static <T> T fromV1Json(String jsonStr, TypeReference<T> type) {
- T ret = null;
-
- if (jsonStr != null) {
- try {
- ret = mapperV1.readValue(jsonStr, type);
- } catch (IOException e) {
- LOG.error("AtlasType.toV1Json()", e);
+ public static String toV1Json(Object obj) {
+ return toJson(obj);
+ }
- ret = null;
- }
- }
+ public static <T> T fromV1Json(String jsonStr, Class<T> type) {
+ return fromJson(jsonStr, type);
+ }
- return ret;
+ public static <T> T fromV1Json(String jsonStr, TypeReference<T> type) {
+ return fromJson(jsonStr, type);
}
public static String toV1SearchJson(Object obj) {
@@ -198,7 +197,7 @@ public class AtlasJson {
}
public static ArrayNode createV1ArrayNode(Collection<?> array) {
- ArrayNode ret = mapperV1.createArrayNode();
+ ArrayNode ret = mapper.createArrayNode();
for (Object elem : array) {
ret.addPOJO(elem);
@@ -209,13 +208,13 @@ public class AtlasJson {
public static JsonNode parseToV1JsonNode(String json) throws IOException {
- JsonNode jsonNode = mapperV1.readTree(json);
+ JsonNode jsonNode = mapper.readTree(json);
return jsonNode;
}
public static ArrayNode parseToV1ArrayNode(String json) throws IOException {
- JsonNode jsonNode = mapperV1.readTree(json);
+ JsonNode jsonNode = mapper.readTree(json);
if (jsonNode instanceof ArrayNode) {
return (ArrayNode)jsonNode;
@@ -228,7 +227,7 @@ public class AtlasJson {
ArrayNode ret = createV1ArrayNode();
for (String json : jsonStrings) {
- JsonNode jsonNode = mapperV1.readTree(json);
+ JsonNode jsonNode = mapper.readTree(json);
ret.add(jsonNode);
}
@@ -263,6 +262,60 @@ public class AtlasJson {
}
}
+ static class ReferenceableSerializer extends JsonSerializer<Referenceable> {
+ @Override
+ public void serialize(Referenceable value, JsonGenerator jgen, SerializerProvider provider) throws IOException {
+ if (value != null) {
+ mapperV1.writeValue(jgen, value);
+ }
+ }
+ }
+
+ static class ReferenceableDeserializer extends JsonDeserializer<Referenceable> {
+ @Override
+ public Referenceable deserialize(JsonParser parser, DeserializationContext context) throws IOException {
+ Referenceable ret = mapperV1.readValue(parser, Referenceable.class);
+
+ return ret;
+ }
+ }
+
+ static class StructSerializer extends JsonSerializer<Struct> {
+ @Override
+ public void serialize(Struct value, JsonGenerator jgen, SerializerProvider provider) throws IOException {
+ if (value != null) {
+ mapperV1.writeValue(jgen, value);
+ }
+ }
+ }
+
+ static class StructDeserializer extends JsonDeserializer<Struct> {
+ @Override
+ public Struct deserialize(JsonParser parser, DeserializationContext context) throws IOException {
+ Struct ret = mapperV1.readValue(parser, Struct.class);
+
+ return ret;
+ }
+ }
+
+ static class IdSerializer extends JsonSerializer<Id> {
+ @Override
+ public void serialize(Id value, JsonGenerator jgen, SerializerProvider provider) throws IOException {
+ if (value != null) {
+ mapperV1.writeValue(jgen, value);
+ }
+ }
+ }
+
+ static class IdDeserializer extends JsonDeserializer<Id> {
+ @Override
+ public Id deserialize(JsonParser parser, DeserializationContext context) throws IOException {
+ Id ret = mapperV1.readValue(parser, Id.class);
+
+ return ret;
+ }
+ }
+
static class HookNotificationDeserializer extends JsonDeserializer<HookNotification> {
@Override
public HookNotification deserialize(JsonParser parser, DeserializationContext context) throws IOException {
@@ -295,6 +348,22 @@ public class AtlasJson {
case ENTITY_DELETE:
ret = mapper.treeToValue(root, EntityDeleteRequest.class);
break;
+
+ case ENTITY_CREATE_V2:
+ ret = mapper.treeToValue(root, EntityCreateRequestV2.class);
+ break;
+
+ case ENTITY_PARTIAL_UPDATE_V2:
+ ret = mapper.treeToValue(root, EntityPartialUpdateRequestV2.class);
+ break;
+
+ case ENTITY_FULL_UPDATE_V2:
+ ret = mapper.treeToValue(root, EntityUpdateRequestV2.class);
+ break;
+
+ case ENTITY_DELETE_V2:
+ ret = mapper.treeToValue(root, EntityDeleteRequestV2.class);
+ break;
}
}
http://git-wip-us.apache.org/repos/asf/atlas/blob/a02be15d/notification/src/main/java/org/apache/atlas/notification/AtlasNotificationMessageDeserializer.java
----------------------------------------------------------------------
diff --git a/notification/src/main/java/org/apache/atlas/notification/AtlasNotificationMessageDeserializer.java b/notification/src/main/java/org/apache/atlas/notification/AtlasNotificationMessageDeserializer.java
index 47f72c5..3264e26 100644
--- a/notification/src/main/java/org/apache/atlas/notification/AtlasNotificationMessageDeserializer.java
+++ b/notification/src/main/java/org/apache/atlas/notification/AtlasNotificationMessageDeserializer.java
@@ -110,7 +110,7 @@ public abstract class AtlasNotificationMessageDeserializer<T> implements Message
AtlasNotificationBaseMessage msg = AtlasType.fromV1Json(messageJson, AtlasNotificationBaseMessage.class);
- if (msg.getVersion() == null) { // older style messages not wrapped with AtlasNotificationMessage
+ if (msg == null || msg.getVersion() == null) { // older style messages not wrapped with AtlasNotificationMessage
ret = AtlasType.fromV1Json(messageJson, messageType);
} else {
String msgJson = messageJson;
http://git-wip-us.apache.org/repos/asf/atlas/blob/a02be15d/notification/src/test/java/org/apache/atlas/notification/hook/HookNotificationTest.java
----------------------------------------------------------------------
diff --git a/notification/src/test/java/org/apache/atlas/notification/hook/HookNotificationTest.java b/notification/src/test/java/org/apache/atlas/notification/hook/HookNotificationTest.java
index cf691af..ccfd264 100644
--- a/notification/src/test/java/org/apache/atlas/notification/hook/HookNotificationTest.java
+++ b/notification/src/test/java/org/apache/atlas/notification/hook/HookNotificationTest.java
@@ -17,13 +17,29 @@
*/
package org.apache.atlas.notification.hook;
+import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.atlas.model.instance.AtlasEntity.AtlasEntitiesWithExtInfo;
+import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo;
+import org.apache.atlas.model.instance.AtlasObjectId;
import org.apache.atlas.model.notification.HookNotification;
+import org.apache.atlas.model.notification.HookNotification.EntityCreateRequestV2;
+import org.apache.atlas.model.notification.HookNotification.EntityDeleteRequestV2;
+import org.apache.atlas.model.notification.HookNotification.EntityUpdateRequestV2;
+import org.apache.atlas.model.notification.HookNotification.EntityPartialUpdateRequestV2;
import org.apache.atlas.model.notification.HookNotification.HookNotificationType;
-import org.apache.atlas.v1.model.instance.Referenceable;
import org.apache.atlas.type.AtlasType;
+import org.apache.atlas.type.AtlasTypeUtil;
+import org.apache.atlas.utils.AtlasJson;
+import org.apache.atlas.v1.model.instance.Referenceable;
import org.apache.atlas.v1.model.notification.HookNotificationV1.EntityCreateRequest;
+
import org.testng.annotations.Test;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
+
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;
@@ -31,12 +47,19 @@ import static org.testng.Assert.assertTrue;
public class HookNotificationTest {
private HookMessageDeserializer deserializer = new HookMessageDeserializer();
+ private static final String ATTR_VALUE_STRING = "strValue";
+ private static final Integer ATTR_VALUE_INTEGER = 10;
+ private static final Boolean ATTR_VALUE_BOOLEAN = Boolean.TRUE;
+ private static final Date ATTR_VALUE_DATE = new Date();
+
@Test
public void testNewMessageSerDe() throws Exception {
Referenceable entity1 = new Referenceable("sometype");
+ Referenceable entity2 = new Referenceable("newtype");
+
entity1.set("attr", "value");
entity1.set("complex", new Referenceable("othertype"));
- Referenceable entity2 = new Referenceable("newtype");
+
String user = "user";
EntityCreateRequest request = new EntityCreateRequest(user, entity1, entity2);
@@ -98,4 +121,174 @@ public class HookNotificationTest {
assertEquals(actualNotification.getType(), HookNotificationType.ENTITY_CREATE);
assertEquals(actualNotification.getUser(), HookNotification.UNKNOW_USER);
}
+
+ @Test
+ public void testEntityCreateV2SerDe() throws Exception {
+ AtlasEntity entity1 = new AtlasEntity("sometype");
+ AtlasEntity entity2 = new AtlasEntity("newtype");
+ AtlasEntity entity3 = new AtlasEntity("othertype");
+
+ setAttributes(entity1);
+ entity1.setAttribute("complex", new AtlasObjectId(entity3.getGuid(), entity3.getTypeName()));
+
+ AtlasEntitiesWithExtInfo entities = new AtlasEntitiesWithExtInfo();
+ entities.addEntity(entity1);
+ entities.addEntity(entity2);
+ entities.addReferredEntity(entity3);
+
+ String user = "user";
+ EntityCreateRequestV2 request = new EntityCreateRequestV2(user, entities);
+ String notificationJson = AtlasJson.toJson(request);
+ HookNotification actualNotification = deserializer.deserialize(notificationJson);
+
+ assertEquals(actualNotification.getType(), HookNotificationType.ENTITY_CREATE_V2);
+ assertEquals(actualNotification.getUser(), user);
+
+ EntityCreateRequestV2 createRequest = (EntityCreateRequestV2) actualNotification;
+
+ assertEquals(createRequest.getEntities().getEntities().size(), 2);
+
+ AtlasEntity actualEntity1 = createRequest.getEntities().getEntities().get(0);
+ AtlasEntity actualEntity2 = createRequest.getEntities().getEntities().get(1);
+ AtlasEntity actualEntity3 = createRequest.getEntities().getReferredEntity(entity3.getGuid());
+ Map actualComplexAttr = (Map)actualEntity1.getAttribute("complex");
+
+ assertEquals(actualEntity1.getGuid(), entity1.getGuid());
+ assertEquals(actualEntity1.getTypeName(), entity1.getTypeName());
+ assertAttributes(actualEntity1);
+ assertEquals(actualComplexAttr.get(AtlasObjectId.KEY_GUID), entity3.getGuid());
+ assertEquals(actualComplexAttr.get(AtlasObjectId.KEY_TYPENAME), entity3.getTypeName());
+
+ assertEquals(actualEntity2.getGuid(), entity2.getGuid());
+ assertEquals(actualEntity2.getTypeName(), entity2.getTypeName());
+
+ assertEquals(actualEntity3.getGuid(), entity3.getGuid());
+ assertEquals(actualEntity3.getTypeName(), entity3.getTypeName());
+ }
+
+ @Test
+ public void testEntityUpdateV2SerDe() throws Exception {
+ AtlasEntity entity1 = new AtlasEntity("sometype");
+ AtlasEntity entity2 = new AtlasEntity("newtype");
+ AtlasEntity entity3 = new AtlasEntity("othertype");
+
+ setAttributes(entity1);
+ entity1.setAttribute("complex", new AtlasObjectId(entity3.getGuid(), entity3.getTypeName()));
+
+ AtlasEntitiesWithExtInfo entities = new AtlasEntitiesWithExtInfo();
+ entities.addEntity(entity1);
+ entities.addEntity(entity2);
+ entities.addReferredEntity(entity3);
+
+ String user = "user";
+ EntityUpdateRequestV2 request = new EntityUpdateRequestV2(user, entities);
+ String notificationJson = AtlasJson.toJson(request);
+ HookNotification actualNotification = deserializer.deserialize(notificationJson);
+
+ assertEquals(actualNotification.getType(), HookNotificationType.ENTITY_FULL_UPDATE_V2);
+ assertEquals(actualNotification.getUser(), user);
+
+ EntityUpdateRequestV2 updateRequest = (EntityUpdateRequestV2) actualNotification;
+
+ assertEquals(updateRequest.getEntities().getEntities().size(), 2);
+
+ AtlasEntity actualEntity1 = updateRequest.getEntities().getEntities().get(0);
+ AtlasEntity actualEntity2 = updateRequest.getEntities().getEntities().get(1);
+ AtlasEntity actualEntity3 = updateRequest.getEntities().getReferredEntity(entity3.getGuid());
+ Map actualComplexAttr = (Map)actualEntity1.getAttribute("complex");
+
+ assertEquals(actualEntity1.getGuid(), entity1.getGuid());
+ assertEquals(actualEntity1.getTypeName(), entity1.getTypeName());
+ assertAttributes(actualEntity1);
+ assertEquals(actualComplexAttr.get(AtlasObjectId.KEY_GUID), entity3.getGuid());
+ assertEquals(actualComplexAttr.get(AtlasObjectId.KEY_TYPENAME), entity3.getTypeName());
+
+ assertEquals(actualEntity2.getGuid(), entity2.getGuid());
+ assertEquals(actualEntity2.getTypeName(), entity2.getTypeName());
+
+ assertEquals(actualEntity3.getGuid(), entity3.getGuid());
+ assertEquals(actualEntity3.getTypeName(), entity3.getTypeName());
+ }
+
+ @Test
+ public void testEntityPartialUpdateV2SerDe() throws Exception {
+ AtlasEntity entity1 = new AtlasEntity("sometype");
+ AtlasEntity entity2 = new AtlasEntity("newtype");
+ AtlasEntity entity3 = new AtlasEntity("othertype");
+
+ setAttributes(entity1);
+ entity1.setAttribute("complex", new AtlasObjectId(entity3.getGuid(), entity3.getTypeName()));
+
+ AtlasEntityWithExtInfo entity = new AtlasEntityWithExtInfo(entity1);
+ entity.addReferredEntity(entity2);
+ entity.addReferredEntity(entity3);
+
+ String user = "user";
+ EntityPartialUpdateRequestV2 request = new EntityPartialUpdateRequestV2(user, AtlasTypeUtil.getAtlasObjectId(entity1), entity);
+ String notificationJson = AtlasJson.toJson(request);
+ HookNotification actualNotification = deserializer.deserialize(notificationJson);
+
+ assertEquals(actualNotification.getType(), HookNotificationType.ENTITY_PARTIAL_UPDATE_V2);
+ assertEquals(actualNotification.getUser(), user);
+
+ EntityPartialUpdateRequestV2 updateRequest = (EntityPartialUpdateRequestV2) actualNotification;
+
+ assertEquals(updateRequest.getEntity().getReferredEntities().size(), 2);
+
+ AtlasEntity actualEntity1 = updateRequest.getEntity().getEntity();
+ AtlasEntity actualEntity2 = updateRequest.getEntity().getReferredEntity(entity2.getGuid());
+ AtlasEntity actualEntity3 = updateRequest.getEntity().getReferredEntity(entity3.getGuid());
+ Map actualComplexAttr = (Map)actualEntity1.getAttribute("complex");
+
+ assertEquals(actualEntity1.getGuid(), entity1.getGuid());
+ assertEquals(actualEntity1.getTypeName(), entity1.getTypeName());
+ assertAttributes(actualEntity1);
+ assertEquals(actualComplexAttr.get(AtlasObjectId.KEY_GUID), entity3.getGuid());
+ assertEquals(actualComplexAttr.get(AtlasObjectId.KEY_TYPENAME), entity3.getTypeName());
+
+ assertEquals(actualEntity2.getGuid(), entity2.getGuid());
+ assertEquals(actualEntity2.getTypeName(), entity2.getTypeName());
+
+ assertEquals(actualEntity3.getGuid(), entity3.getGuid());
+ assertEquals(actualEntity3.getTypeName(), entity3.getTypeName());
+ }
+
+ @Test
+ public void testEntityDeleteV2SerDe() throws Exception {
+ AtlasEntity entity1 = new AtlasEntity("sometype");
+ AtlasEntity entity2 = new AtlasEntity("newtype");
+ AtlasEntity entity3 = new AtlasEntity("othertype");
+
+ List<AtlasObjectId> objectsToDelete = new ArrayList<>();
+ objectsToDelete.add(new AtlasObjectId(entity1.getGuid(), entity1.getTypeName()));
+ objectsToDelete.add(new AtlasObjectId(entity2.getGuid(), entity2.getTypeName()));
+ objectsToDelete.add(new AtlasObjectId(entity3.getGuid(), entity3.getTypeName()));
+
+ String user = "user";
+ EntityDeleteRequestV2 request = new EntityDeleteRequestV2(user, objectsToDelete);
+ String notificationJson = AtlasJson.toJson(request);
+ HookNotification actualNotification = deserializer.deserialize(notificationJson);
+
+ assertEquals(actualNotification.getType(), HookNotificationType.ENTITY_DELETE_V2);
+ assertEquals(actualNotification.getUser(), user);
+
+ EntityDeleteRequestV2 deleteRequest = (EntityDeleteRequestV2) actualNotification;
+
+ assertEquals(deleteRequest.getEntities().size(), objectsToDelete.size());
+ assertEquals(deleteRequest.getEntities(), objectsToDelete);
+ }
+
+ private void setAttributes(AtlasEntity entity) {
+ entity.setAttribute("attrStr", ATTR_VALUE_STRING);
+ entity.setAttribute("attrInt", ATTR_VALUE_INTEGER);
+ entity.setAttribute("attrBool", ATTR_VALUE_BOOLEAN);
+ entity.setAttribute("attrDate", ATTR_VALUE_DATE);
+ }
+
+ private void assertAttributes(AtlasEntity entity) {
+ assertEquals(entity.getAttribute("attrStr"), ATTR_VALUE_STRING);
+ assertEquals(entity.getAttribute("attrInt"), ATTR_VALUE_INTEGER);
+ assertEquals(entity.getAttribute("attrBool"), ATTR_VALUE_BOOLEAN);
+ assertEquals(entity.getAttribute("attrDate"), ATTR_VALUE_DATE.getTime());
+ }
}
http://git-wip-us.apache.org/repos/asf/atlas/blob/a02be15d/repository/src/main/java/org/apache/atlas/repository/store/graph/AtlasEntityStore.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/AtlasEntityStore.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/AtlasEntityStore.java
index 0fe35b6..79e8e3e 100644
--- a/repository/src/main/java/org/apache/atlas/repository/store/graph/AtlasEntityStore.java
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/AtlasEntityStore.java
@@ -21,6 +21,7 @@ import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.instance.AtlasClassification;
import org.apache.atlas.model.instance.AtlasEntity.AtlasEntitiesWithExtInfo;
import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo;
+import org.apache.atlas.model.instance.AtlasObjectId;
import org.apache.atlas.model.instance.EntityMutationResponse;
import org.apache.atlas.repository.store.graph.v1.EntityStream;
import org.apache.atlas.type.AtlasEntityType;
@@ -85,6 +86,16 @@ public interface AtlasEntityStore {
/**
* Update a single entity
+ * @param objectId ID of the entity
+ * @param updatedEntityInfo updated entity information
+ * @return EntityMutationResponse details of the updates performed by this call
+ * @throws AtlasBaseException
+ *
+ */
+ EntityMutationResponse updateEntity(AtlasObjectId objectId, AtlasEntityWithExtInfo updatedEntityInfo, boolean isPartialUpdate) throws AtlasBaseException;
+
+ /**
+ * Update a single entity
* @param entityType type of the entity
* @param uniqAttributes Attributes that uniquely identify the entity
* @return EntityMutationResponse details of the updates performed by this call
http://git-wip-us.apache.org/repos/asf/atlas/blob/a02be15d/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1.java
index a020e9f..db7594d 100644
--- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1.java
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1.java
@@ -226,6 +226,38 @@ public class AtlasEntityStoreV1 implements AtlasEntityStore {
@Override
@GraphTransaction
+ public EntityMutationResponse updateEntity(AtlasObjectId objectId, AtlasEntityWithExtInfo updatedEntityInfo, boolean isPartialUpdate) throws AtlasBaseException {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("==> updateEntity({}, {}, {})", objectId, updatedEntityInfo, isPartialUpdate);
+ }
+
+ if (objectId == null || updatedEntityInfo == null || updatedEntityInfo.getEntity() == null) {
+ throw new AtlasBaseException(AtlasErrorCode.INVALID_PARAMETERS, "null entity-id/entity");
+ }
+
+ final String guid;
+
+ if (AtlasTypeUtil.isAssignedGuid(objectId.getGuid())) {
+ guid = objectId.getGuid();
+ } else {
+ AtlasEntityType entityType = typeRegistry.getEntityTypeByName(objectId.getTypeName());
+
+ if (entityType == null) {
+ throw new AtlasBaseException(AtlasErrorCode.UNKNOWN_TYPENAME, objectId.getTypeName());
+ }
+
+ guid = AtlasGraphUtilsV1.getGuidByUniqueAttributes(typeRegistry.getEntityTypeByName(objectId.getTypeName()), objectId.getUniqueAttributes());
+ }
+
+ AtlasEntity entity = updatedEntityInfo.getEntity();
+
+ entity.setGuid(guid);
+
+ return createOrUpdate(new AtlasEntityStream(updatedEntityInfo), isPartialUpdate, false);
+ }
+
+ @Override
+ @GraphTransaction
public EntityMutationResponse updateByUniqueAttributes(AtlasEntityType entityType, Map<String, Object> uniqAttributes,
AtlasEntityWithExtInfo updatedEntityInfo) throws AtlasBaseException {
http://git-wip-us.apache.org/repos/asf/atlas/blob/a02be15d/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
----------------------------------------------------------------------
diff --git a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
index 456a778..ced9b7e 100644
--- a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
+++ b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
@@ -23,6 +23,7 @@ import kafka.utils.ShutdownableThread;
import org.apache.atlas.ApplicationProperties;
import org.apache.atlas.AtlasBaseClient;
import org.apache.atlas.AtlasClient;
+import org.apache.atlas.AtlasClientV2;
import org.apache.atlas.AtlasException;
import org.apache.atlas.AtlasServiceException;
import org.apache.atlas.RequestContextV1;
@@ -30,7 +31,13 @@ import org.apache.atlas.ha.HAConfiguration;
import org.apache.atlas.kafka.AtlasKafkaMessage;
import org.apache.atlas.listener.ActiveStateChangeHandler;
import org.apache.atlas.model.instance.AtlasEntity.AtlasEntitiesWithExtInfo;
+import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo;
+import org.apache.atlas.model.instance.AtlasObjectId;
import org.apache.atlas.model.notification.HookNotification;
+import org.apache.atlas.model.notification.HookNotification.EntityCreateRequestV2;
+import org.apache.atlas.model.notification.HookNotification.EntityDeleteRequestV2;
+import org.apache.atlas.model.notification.HookNotification.EntityUpdateRequestV2;
+import org.apache.atlas.model.notification.HookNotification.EntityPartialUpdateRequestV2;
import org.apache.atlas.notification.NotificationInterface.NotificationType;
import org.apache.atlas.v1.model.instance.Referenceable;
import org.apache.atlas.v1.model.notification.HookNotificationV1.EntityCreateRequest;
@@ -353,8 +360,6 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
try {
// Used for intermediate conversions during create and update
- AtlasEntitiesWithExtInfo entities = null;
-
for (int numRetries = 0; numRetries < maxRetries; numRetries++) {
if (LOG.isDebugEnabled()) {
LOG.debug("handleMessage({}): attempt {}", message.getType().name(), numRetries);
@@ -366,8 +371,9 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
requestContext.setUser(messageUser);
switch (message.getType()) {
- case ENTITY_CREATE:
- final EntityCreateRequest createRequest = (EntityCreateRequest) message;
+ case ENTITY_CREATE: {
+ final EntityCreateRequest createRequest = (EntityCreateRequest) message;
+ final AtlasEntitiesWithExtInfo entities = instanceConverter.toAtlasEntities(createRequest.getEntities());
if (numRetries == 0) { // audit only on the first attempt
AtlasBaseClient.API api = AtlasClient.API_V1.CREATE_ENTITY;
@@ -375,24 +381,20 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
audit(messageUser, api.getMethod(), api.getNormalizedPath());
}
- entities = instanceConverter.toAtlasEntities(createRequest.getEntities());
-
atlasEntityStore.createOrUpdate(new AtlasEntityStream(entities), false);
- break;
+ }
+ break;
- case ENTITY_PARTIAL_UPDATE:
+ case ENTITY_PARTIAL_UPDATE: {
final EntityPartialUpdateRequest partialUpdateRequest = (EntityPartialUpdateRequest) message;
+ final Referenceable referenceable = partialUpdateRequest.getEntity();
+ final AtlasEntitiesWithExtInfo entities = instanceConverter.toAtlasEntity(referenceable);
if (numRetries == 0) { // audit only on the first attempt
AtlasBaseClient.API api = UPDATE_ENTITY_BY_ATTRIBUTE;
-
audit(messageUser, api.getMethod(), String.format(api.getNormalizedPath(), partialUpdateRequest.getTypeName()));
}
- Referenceable referenceable = partialUpdateRequest.getEntity();
-
- entities = instanceConverter.toAtlasEntity(referenceable);
-
AtlasEntityType entityType = typeRegistry.getEntityTypeByName(partialUpdateRequest.getTypeName());
String guid = AtlasGraphUtilsV1.getGuidByUniqueAttributes(entityType, Collections.singletonMap(partialUpdateRequest.getAttribute(), (Object)partialUpdateRequest.getAttributeValue()));
@@ -400,14 +402,14 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
entities.getEntities().get(0).setGuid(guid);
atlasEntityStore.createOrUpdate(new AtlasEntityStream(entities), true);
- break;
+ }
+ break;
- case ENTITY_DELETE:
+ case ENTITY_DELETE: {
final EntityDeleteRequest deleteRequest = (EntityDeleteRequest) message;
if (numRetries == 0) { // audit only on the first attempt
AtlasBaseClient.API api = DELETE_ENTITY_BY_ATTRIBUTE;
-
audit(messageUser, api.getMethod(), String.format(api.getNormalizedPath(), deleteRequest.getTypeName()));
}
@@ -416,12 +418,14 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
atlasEntityStore.deleteByUniqueAttributes(type, Collections.singletonMap(deleteRequest.getAttribute(), (Object) deleteRequest.getAttributeValue()));
} catch (ClassCastException cle) {
- LOG.error("Failed to do a partial update on Entity");
+ LOG.error("Failed to delete entity {}", deleteRequest);
}
- break;
+ }
+ break;
- case ENTITY_FULL_UPDATE:
- final EntityUpdateRequest updateRequest = (EntityUpdateRequest) message;
+ case ENTITY_FULL_UPDATE: {
+ final EntityUpdateRequest updateRequest = (EntityUpdateRequest) message;
+ final AtlasEntitiesWithExtInfo entities = instanceConverter.toAtlasEntities(updateRequest.getEntities());
if (numRetries == 0) { // audit only on the first attempt
AtlasBaseClient.API api = UPDATE_ENTITY;
@@ -429,10 +433,70 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
audit(messageUser, api.getMethod(), api.getNormalizedPath());
}
- entities = instanceConverter.toAtlasEntities(updateRequest.getEntities());
+ atlasEntityStore.createOrUpdate(new AtlasEntityStream(entities), false);
+ }
+ break;
+
+ case ENTITY_CREATE_V2: {
+ final EntityCreateRequestV2 createRequestV2 = (EntityCreateRequestV2) message;
+ final AtlasEntitiesWithExtInfo entities = createRequestV2.getEntities();
+
+ if (numRetries == 0) { // audit only on the first attempt
+ AtlasBaseClient.API api = AtlasClientV2.API_V2.CREATE_ENTITY;
+ audit(messageUser, api.getMethod(), api.getNormalizedPath());
+ }
+
+ atlasEntityStore.createOrUpdate(new AtlasEntityStream(entities), false);
+ }
+ break;
+
+ case ENTITY_PARTIAL_UPDATE_V2: {
+ final EntityPartialUpdateRequestV2 partialUpdateRequest = (EntityPartialUpdateRequestV2) message;
+ final AtlasObjectId entityId = partialUpdateRequest.getEntityId();
+ final AtlasEntityWithExtInfo entity = partialUpdateRequest.getEntity();
+
+ if (numRetries == 0) { // audit only on the first attempt
+ AtlasBaseClient.API api = AtlasClientV2.API_V2.UPDATE_ENTITY;
+ audit(messageUser, api.getMethod(), api.getNormalizedPath());
+ }
+
+ atlasEntityStore.updateEntity(entityId, entity, true);
+ }
+ break;
+
+ case ENTITY_FULL_UPDATE_V2: {
+ final EntityUpdateRequestV2 updateRequest = (EntityUpdateRequestV2) message;
+ final AtlasEntitiesWithExtInfo entities = updateRequest.getEntities();
+
+ if (numRetries == 0) { // audit only on the first attempt
+ AtlasBaseClient.API api = AtlasClientV2.API_V2.UPDATE_ENTITY;
+ audit(messageUser, api.getMethod(), api.getNormalizedPath());
+ }
atlasEntityStore.createOrUpdate(new AtlasEntityStream(entities), false);
- break;
+ }
+ break;
+
+ case ENTITY_DELETE_V2: {
+ final EntityDeleteRequestV2 deleteRequest = (EntityDeleteRequestV2) message;
+ final List<AtlasObjectId> entities = deleteRequest.getEntities();
+
+ try {
+ for (AtlasObjectId entity : entities) {
+ if (numRetries == 0) { // audit only on the first attempt
+ AtlasBaseClient.API api = AtlasClientV2.API_V2.DELETE_ENTITY_BY_ATTRIBUTE;
+ audit(messageUser, api.getMethod(), String.format(api.getNormalizedPath(), entity.getTypeName()));
+ }
+
+ AtlasEntityType type = (AtlasEntityType) typeRegistry.getType(entity.getTypeName());
+
+ atlasEntityStore.deleteByUniqueAttributes(type, entity.getUniqueAttributes());
+ }
+ } catch (ClassCastException cle) {
+ LOG.error("Failed to do delete entities {}", entities);
+ }
+ }
+ break;
default:
throw new IllegalStateException("Unknown notification type: " + message.getType().name());
@@ -541,4 +605,4 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
AuditFilter.audit(messageUser, THREADNAME_PREFIX, method, LOCALHOST, path, LOCALHOST, DateTimeHelper.formatDateUTC(new Date()));
}
-}
\ No newline at end of file
+}