You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@atlas.apache.org by sh...@apache.org on 2015/12/14 17:45:10 UTC
incubator-atlas git commit: ATLAS-386 Handle hive rename Table
(shwethags)
Repository: incubator-atlas
Updated Branches:
refs/heads/branch-0.6-incubating dc2ca4520 -> dad90970d
ATLAS-386 Handle hive rename Table (shwethags)
Project: http://git-wip-us.apache.org/repos/asf/incubator-atlas/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-atlas/commit/dad90970
Tree: http://git-wip-us.apache.org/repos/asf/incubator-atlas/tree/dad90970
Diff: http://git-wip-us.apache.org/repos/asf/incubator-atlas/diff/dad90970
Branch: refs/heads/branch-0.6-incubating
Commit: dad90970da80038757619bd752e72a16e27d36c2
Parents: dc2ca45
Author: Shwetha GS <ss...@hortonworks.com>
Authored: Mon Dec 14 22:15:01 2015 +0530
Committer: Shwetha GS <ss...@hortonworks.com>
Committed: Mon Dec 14 22:15:01 2015 +0530
----------------------------------------------------------------------
addons/hive-bridge/pom.xml | 4 +
.../atlas/hive/bridge/HiveMetaStoreBridge.java | 4 +
.../org/apache/atlas/hive/hook/HiveHook.java | 112 ++++++------
.../org/apache/atlas/hive/hook/HiveHookIT.java | 15 +-
.../main/java/org/apache/atlas/AtlasClient.java | 13 +-
docs/src/site/twiki/Bridge-Hive.twiki | 2 +-
.../apache/atlas/kafka/KafkaNotification.java | 2 +-
.../notification/AbstractNotification.java | 19 ++
.../AbstractNotificationConsumer.java | 42 ++++-
.../notification/NotificationHookConsumer.java | 49 ++++--
.../notification/NotificationInterface.java | 8 +-
.../NotificationEntityChangeListener.java | 32 +---
.../notification/hook/HookNotification.java | 174 +++++++++++++++++++
.../atlas/kafka/KafkaNotificationTest.java | 20 ---
.../notification/hook/HookNotificationTest.java | 68 ++++++++
pom.xml | 9 +-
release-log.txt | 1 +
.../atlas/services/DefaultMetadataService.java | 7 +
.../test/java/org/apache/atlas/TestUtils.java | 4 +-
.../graph/TitanGraphProviderTest.java | 3 -
.../service/DefaultMetadataServiceTest.java | 46 +++--
.../apache/atlas/typesystem/Referenceable.java | 6 +-
.../atlas/typesystem/types/ClassType.java | 6 +-
.../atlas/typesystem/types/Multiplicity.java | 4 +-
.../src/main/resources/application.properties | 6 +-
webapp/pom.xml | 4 +
.../src/main/java/org/apache/atlas/Atlas.java | 4 +
.../NotificationHookConsumerIT.java | 111 ++++++++----
.../web/resources/EntityJerseyResourceIT.java | 31 ++++
29 files changed, 602 insertions(+), 204 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/dad90970/addons/hive-bridge/pom.xml
----------------------------------------------------------------------
diff --git a/addons/hive-bridge/pom.xml b/addons/hive-bridge/pom.xml
index db842d7..4b0ac0f 100755
--- a/addons/hive-bridge/pom.xml
+++ b/addons/hive-bridge/pom.xml
@@ -290,6 +290,10 @@
<name>atlas.log.dir</name>
<value>${project.build.directory}/logs</value>
</systemProperty>
+ <systemProperty>
+ <name>atlas.data</name>
+ <value>${project.build.directory}/data</value>
+ </systemProperty>
</systemProperties>
<stopKey>atlas-stop</stopKey>
<stopPort>31001</stopPort>
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/dad90970/addons/hive-bridge/src/main/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridge.java
----------------------------------------------------------------------
diff --git a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridge.java b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridge.java
index f367317..ee5ae10 100755
--- a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridge.java
+++ b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridge.java
@@ -70,6 +70,10 @@ public class HiveMetaStoreBridge {
this(hiveConf, atlasConf, null, null);
}
+ public String getClusterName() {
+ return clusterName;
+ }
+
/**
* Construct a HiveMetaStoreBridge.
* @param hiveConf hive conf
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/dad90970/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java
----------------------------------------------------------------------
diff --git a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java
index 2f88446..37a3169 100755
--- a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java
+++ b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java
@@ -25,11 +25,12 @@ import com.google.inject.Inject;
import com.google.inject.Injector;
import org.apache.atlas.ApplicationProperties;
import org.apache.atlas.hive.bridge.HiveMetaStoreBridge;
+import org.apache.atlas.hive.model.HiveDataModelGenerator;
import org.apache.atlas.hive.model.HiveDataTypes;
import org.apache.atlas.notification.NotificationInterface;
import org.apache.atlas.notification.NotificationModule;
+import org.apache.atlas.notification.hook.HookNotification;
import org.apache.atlas.typesystem.Referenceable;
-import org.apache.atlas.typesystem.json.InstanceSerialization;
import org.apache.commons.configuration.Configuration;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.hive.conf.HiveConf;
@@ -47,16 +48,12 @@ import org.apache.hadoop.hive.ql.metadata.Partition;
import org.apache.hadoop.hive.ql.metadata.Table;
import org.apache.hadoop.hive.ql.plan.HiveOperation;
import org.apache.hadoop.security.UserGroupInformation;
-import org.codehaus.jettison.json.JSONArray;
import org.json.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
-import java.util.Collection;
-import java.util.LinkedHashMap;
import java.util.List;
-import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
@@ -109,6 +106,8 @@ public class HiveHook implements ExecuteWithHookContext {
@Inject
private static NotificationInterface notifInterface;
+ private List<HookNotification.HookNotificationMessage> messages = new ArrayList<>();
+
private static final HiveConf hiveConf;
static {
@@ -233,37 +232,51 @@ public class HiveHook implements ExecuteWithHookContext {
default:
}
+
+ notifyAtlas();
}
- //todo re-write with notification
private void renameTable(HiveMetaStoreBridge dgiBridge, HiveEvent event) throws Exception {
//crappy, no easy of getting new name
assert event.inputs != null && event.inputs.size() == 1;
assert event.outputs != null && event.outputs.size() > 0;
- Table oldTable = event.inputs.iterator().next().getTable();
- Table newTable = null;
+ //Update entity if not exists
+ ReadEntity oldEntity = event.inputs.iterator().next();
+ Table oldTable = oldEntity.getTable();
+
for (WriteEntity writeEntity : event.outputs) {
if (writeEntity.getType() == Entity.Type.TABLE) {
- Table table = writeEntity.getTable();
- if (table.getDbName().equals(oldTable.getDbName()) && !table.getTableName()
+ Table newTable = writeEntity.getTable();
+ if (newTable.getDbName().equals(oldTable.getDbName()) && !newTable.getTableName()
.equals(oldTable.getTableName())) {
- newTable = table;
- break;
+
+ //Create/update old table entity - create new entity and replace id
+ Referenceable tableEntity = createEntities(dgiBridge, writeEntity);
+ String oldQualifiedName = dgiBridge.getTableQualifiedName(dgiBridge.getClusterName(),
+ oldTable.getDbName(), oldTable.getTableName());
+ tableEntity.set(HiveDataModelGenerator.NAME, oldQualifiedName);
+ tableEntity.set(HiveDataModelGenerator.TABLE_NAME, oldTable.getTableName().toLowerCase());
+
+
+ String newQualifiedName = dgiBridge.getTableQualifiedName(dgiBridge.getClusterName(),
+ newTable.getDbName(), newTable.getTableName());
+
+ Referenceable newEntity = new Referenceable(HiveDataTypes.HIVE_TABLE.getName());
+ newEntity.set(HiveDataModelGenerator.NAME, newQualifiedName);
+ newEntity.set(HiveDataModelGenerator.TABLE_NAME, newTable.getTableName().toLowerCase());
+ messages.add(new HookNotification.EntityPartialUpdateRequest(HiveDataTypes.HIVE_TABLE.getName(),
+ HiveDataModelGenerator.NAME, oldQualifiedName, newEntity));
}
}
}
- if (newTable == null) {
- LOG.warn("Failed to deduct new name for " + event.queryStr);
- return;
- }
}
- private Map<Type, Referenceable> createEntities(HiveMetaStoreBridge dgiBridge, Entity entity) throws Exception {
- Map<Type, Referenceable> entities = new LinkedHashMap<>();
+ private Referenceable createEntities(HiveMetaStoreBridge dgiBridge, Entity entity) throws Exception {
Database db = null;
Table table = null;
Partition partition = null;
+ List<Referenceable> entities = new ArrayList<>();
switch (entity.getType()) {
case DATABASE:
@@ -283,64 +296,54 @@ public class HiveHook implements ExecuteWithHookContext {
}
db = dgiBridge.hiveClient.getDatabase(db.getName());
- Referenceable dbReferenceable = dgiBridge.createDBInstance(db);
- entities.put(Type.DATABASE, dbReferenceable);
+ Referenceable dbEntity = dgiBridge.createDBInstance(db);
+ entities.add(dbEntity);
- Referenceable tableReferenceable = null;
+ Referenceable tableEntity = null;
if (table != null) {
table = dgiBridge.hiveClient.getTable(table.getDbName(), table.getTableName());
- tableReferenceable = dgiBridge.createTableInstance(dbReferenceable, table);
- entities.put(Type.TABLE, tableReferenceable);
+ tableEntity = dgiBridge.createTableInstance(dbEntity, table);
+ entities.add(tableEntity);
}
if (partition != null) {
- Referenceable partitionReferenceable = dgiBridge.createPartitionReferenceable(tableReferenceable,
- (Referenceable) tableReferenceable.get("sd"), partition);
- entities.put(Type.PARTITION, partitionReferenceable);
+ Referenceable partitionEntity = dgiBridge.createPartitionReferenceable(tableEntity,
+ (Referenceable) tableEntity.get("sd"), partition);
+ entities.add(partitionEntity);
}
- return entities;
+
+ messages.add(new HookNotification.EntityUpdateRequest(entities));
+ return tableEntity;
}
private void handleEventOutputs(HiveMetaStoreBridge dgiBridge, HiveEvent event, Type entityType) throws Exception {
- List<Referenceable> entities = new ArrayList<>();
for (WriteEntity entity : event.outputs) {
if (entity.getType() == entityType) {
- entities.addAll(createEntities(dgiBridge, entity).values());
+ createEntities(dgiBridge, entity);
}
}
- notifyEntity(entities);
- }
-
- private void notifyEntity(Collection<Referenceable> entities) {
- JSONArray entitiesArray = new JSONArray();
- for (Referenceable entity : entities) {
- String entityJson = InstanceSerialization.toJson(entity, true);
- entitiesArray.put(entityJson);
- }
- notifyEntity(entitiesArray);
}
/**
* Notify atlas of the entity through message. The entity can be a complex entity with reference to other entities.
* De-duping of entities is done on server side depending on the unique attribute on the
- * @param entities
*/
- private void notifyEntity(JSONArray entities) {
+ private void notifyAtlas() {
int maxRetries = atlasProperties.getInt(HOOK_NUM_RETRIES, 3);
- String message = entities.toString();
+ LOG.debug("Notifying atlas with messages {}", messages);
int numRetries = 0;
while (true) {
try {
- notifInterface.send(NotificationInterface.NotificationType.HOOK, message);
- return;
+ notifInterface.send(NotificationInterface.NotificationType.HOOK, messages);
+ break;
} catch(Exception e) {
numRetries++;
if(numRetries < maxRetries) {
- LOG.debug("Failed to notify atlas for entity {}. Retrying", message, e);
+ LOG.debug("Failed to notify atlas. Retrying", e);
} else {
- LOG.error("Failed to notify atlas for entity {} after {} retries. Quitting", message,
- maxRetries, e);
+ LOG.error("Failed to notify atlas after {} retries. Quitting", maxRetries, e);
+ break;
}
}
}
@@ -369,7 +372,7 @@ public class HiveHook implements ExecuteWithHookContext {
String queryStr = normalize(event.queryStr);
LOG.debug("Registering query: {}", queryStr);
- List<Referenceable> entities = new ArrayList<>();
+
Referenceable processReferenceable = new Referenceable(HiveDataTypes.HIVE_PROCESS.getName());
processReferenceable.set("name", queryStr);
processReferenceable.set("operationType", event.operation.getOperationName());
@@ -379,9 +382,8 @@ public class HiveHook implements ExecuteWithHookContext {
List<Referenceable> source = new ArrayList<>();
for (ReadEntity readEntity : inputs) {
if (readEntity.getType() == Type.TABLE || readEntity.getType() == Type.PARTITION) {
- Map<Type, Referenceable> localEntities = createEntities(dgiBridge, readEntity);
- source.add(localEntities.get(Type.TABLE));
- entities.addAll(localEntities.values());
+ Referenceable inTable = createEntities(dgiBridge, readEntity);
+ source.add(inTable);
}
}
processReferenceable.set("inputs", source);
@@ -389,9 +391,8 @@ public class HiveHook implements ExecuteWithHookContext {
List<Referenceable> target = new ArrayList<>();
for (WriteEntity writeEntity : outputs) {
if (writeEntity.getType() == Type.TABLE || writeEntity.getType() == Type.PARTITION) {
- Map<Type, Referenceable> localEntities = createEntities(dgiBridge, writeEntity);
- target.add(localEntities.get(Type.TABLE));
- entities.addAll(localEntities.values());
+ Referenceable outTable = createEntities(dgiBridge, writeEntity);
+ target.add(outTable);
}
}
processReferenceable.set("outputs", target);
@@ -402,8 +403,7 @@ public class HiveHook implements ExecuteWithHookContext {
//TODO set
processReferenceable.set("queryGraph", "queryGraph");
- entities.add(processReferenceable);
- notifyEntity(entities);
+ messages.add(new HookNotification.EntityCreateRequest(processReferenceable));
}
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/dad90970/addons/hive-bridge/src/test/java/org/apache/atlas/hive/hook/HiveHookIT.java
----------------------------------------------------------------------
diff --git a/addons/hive-bridge/src/test/java/org/apache/atlas/hive/hook/HiveHookIT.java b/addons/hive-bridge/src/test/java/org/apache/atlas/hive/hook/HiveHookIT.java
index 5447de5..1c3d9a4 100755
--- a/addons/hive-bridge/src/test/java/org/apache/atlas/hive/hook/HiveHookIT.java
+++ b/addons/hive-bridge/src/test/java/org/apache/atlas/hive/hook/HiveHookIT.java
@@ -149,6 +149,17 @@ public class HiveHookIT {
assertDatabaseIsRegistered(DEFAULT_DB);
}
+ @Test
+ public void testRenameTable() throws Exception {
+ String tableName = createTable();
+ String newTableName = tableName();
+ runCommand(String.format("alter table %s rename to %s", tableName, newTableName));
+
+ assertTableIsRegistered(DEFAULT_DB, newTableName);
+ assertTableIsNotRegistered(DEFAULT_DB, tableName);
+ }
+
+
private String assertColumnIsRegistered(String colName) throws Exception {
LOG.debug("Searching for column {}", colName);
String query =
@@ -327,8 +338,8 @@ public class HiveHookIT {
LOG.debug("Searching for partition of {}.{} with values {}", dbName, tableName, value);
String dslQuery = String.format("%s as p where values = ['%s'], table where tableName = '%s', "
- + "db where name = '%s' and clusterName = '%s' select p", typeName, value,
- tableName.toLowerCase(), dbName.toLowerCase(), CLUSTER_NAME);
+ + "db where name = '%s' and clusterName = '%s' select p", typeName, value,
+ tableName.toLowerCase(), dbName.toLowerCase(), CLUSTER_NAME);
return assertEntityIsRegistered(dslQuery, "p");
}
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/dad90970/client/src/main/java/org/apache/atlas/AtlasClient.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/atlas/AtlasClient.java b/client/src/main/java/org/apache/atlas/AtlasClient.java
index b108b25..0a730af 100755
--- a/client/src/main/java/org/apache/atlas/AtlasClient.java
+++ b/client/src/main/java/org/apache/atlas/AtlasClient.java
@@ -44,6 +44,7 @@ import javax.ws.rs.core.Response;
import javax.ws.rs.core.UriBuilder;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collection;
import java.util.List;
import static org.apache.atlas.security.SecurityProperties.TLS_ENABLED;
@@ -291,12 +292,16 @@ public class AtlasClient {
}
public JSONArray createEntity(Referenceable... entities) throws AtlasServiceException {
+ return createEntity(Arrays.asList(entities));
+ }
+
+ public JSONArray createEntity(Collection<Referenceable> entities) throws AtlasServiceException {
JSONArray entityArray = getEntitiesArray(entities);
return createEntity(entityArray);
}
- private JSONArray getEntitiesArray(Referenceable[] entities) {
- JSONArray entityArray = new JSONArray(entities.length);
+ private JSONArray getEntitiesArray(Collection<Referenceable> entities) {
+ JSONArray entityArray = new JSONArray(entities.size());
for (Referenceable entity : entities) {
entityArray.put(InstanceSerialization.toJson(entity, true));
}
@@ -311,6 +316,10 @@ public class AtlasClient {
* @throws AtlasServiceException
*/
public JSONArray updateEntities(Referenceable... entities) throws AtlasServiceException {
+ return updateEntities(Arrays.asList(entities));
+ }
+
+ public JSONArray updateEntities(Collection<Referenceable> entities) throws AtlasServiceException {
JSONArray entitiesArray = getEntitiesArray(entities);
JSONObject response = callAPI(API.UPDATE_ENTITY, entitiesArray.toString());
try {
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/dad90970/docs/src/site/twiki/Bridge-Hive.twiki
----------------------------------------------------------------------
diff --git a/docs/src/site/twiki/Bridge-Hive.twiki b/docs/src/site/twiki/Bridge-Hive.twiki
index c99f046..0c7732b 100644
--- a/docs/src/site/twiki/Bridge-Hive.twiki
+++ b/docs/src/site/twiki/Bridge-Hive.twiki
@@ -29,7 +29,7 @@ hive_process - attribute name - <queryString> - trimmed query string in lower ca
---++ Importing Hive Metadata
org.apache.atlas.hive.bridge.HiveMetaStoreBridge imports the hive metadata into Atlas using the model defined in org.apache.atlas.hive.model.HiveDataModelGenerator. import-hive.sh command can be used to facilitate this.
-Set the following configuration in <atlas-conf>/client.properties and set environment variable HIVE_CONFIG to the hive conf directory:
+Set the following configuration in <atlas-conf>/client.properties and set environment variable $HIVE_CONF_DIR to the hive conf directory:
<verbatim>
<property>
<name>atlas.cluster.name</name>
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/dad90970/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java
----------------------------------------------------------------------
diff --git a/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java b/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java
index bacabeb..37467b3 100644
--- a/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java
+++ b/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java
@@ -174,7 +174,7 @@ public class KafkaNotification extends AbstractNotification implements Service {
}
@Override
- public void send(NotificationType type, String... messages) throws NotificationException {
+ public void _send(NotificationType type, String... messages) throws NotificationException {
if (producer == null) {
createProducer();
}
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/dad90970/notification/src/main/java/org/apache/atlas/notification/AbstractNotification.java
----------------------------------------------------------------------
diff --git a/notification/src/main/java/org/apache/atlas/notification/AbstractNotification.java b/notification/src/main/java/org/apache/atlas/notification/AbstractNotification.java
index f7bb7b1..72b5a0a 100644
--- a/notification/src/main/java/org/apache/atlas/notification/AbstractNotification.java
+++ b/notification/src/main/java/org/apache/atlas/notification/AbstractNotification.java
@@ -20,6 +20,9 @@ package org.apache.atlas.notification;
import org.apache.atlas.AtlasException;
import org.apache.commons.configuration.Configuration;
+import java.util.Arrays;
+import java.util.List;
+
/**
* Abstract notification interface implementation.
*/
@@ -46,4 +49,20 @@ public abstract class AbstractNotification implements NotificationInterface {
protected final boolean isEmbedded() {
return embedded;
}
+
+ @Override
+ public <T> void send(NotificationType type, List<T> messages) throws NotificationException {
+ String[] strMessages = new String[messages.size()];
+ for (int index = 0; index < messages.size(); index++) {
+ strMessages[index] = AbstractNotificationConsumer.GSON.toJson(messages.get(index));
+ }
+ _send(type, strMessages);
+ }
+
+ @Override
+ public <T> void send(NotificationType type, T... messages) throws NotificationException {
+ send(type, Arrays.asList(messages));
+ }
+
+ protected abstract void _send(NotificationType type, String[] messages) throws NotificationException;
}
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/dad90970/notification/src/main/java/org/apache/atlas/notification/AbstractNotificationConsumer.java
----------------------------------------------------------------------
diff --git a/notification/src/main/java/org/apache/atlas/notification/AbstractNotificationConsumer.java b/notification/src/main/java/org/apache/atlas/notification/AbstractNotificationConsumer.java
index 8c49d4a..42a4e7f 100644
--- a/notification/src/main/java/org/apache/atlas/notification/AbstractNotificationConsumer.java
+++ b/notification/src/main/java/org/apache/atlas/notification/AbstractNotificationConsumer.java
@@ -26,11 +26,16 @@ import com.google.gson.JsonDeserializationContext;
import com.google.gson.JsonDeserializer;
import com.google.gson.JsonElement;
import com.google.gson.JsonParseException;
+import com.google.gson.JsonParser;
+import com.google.gson.JsonSerializationContext;
+import com.google.gson.JsonSerializer;
import com.google.gson.reflect.TypeToken;
import org.apache.atlas.notification.entity.EntityNotification;
import org.apache.atlas.notification.entity.EntityNotificationImpl;
+import org.apache.atlas.notification.hook.HookNotification;
import org.apache.atlas.typesystem.IReferenceableInstance;
import org.apache.atlas.typesystem.IStruct;
+import org.apache.atlas.typesystem.Referenceable;
import org.apache.atlas.typesystem.Struct;
import org.apache.atlas.typesystem.json.InstanceSerialization;
import org.codehaus.jettison.json.JSONArray;
@@ -45,13 +50,15 @@ import java.util.Map;
*/
public abstract class AbstractNotificationConsumer<T> implements NotificationConsumer<T> {
- private static final Gson GSON = new GsonBuilder().
+ public static final Gson GSON = new GsonBuilder().
registerTypeAdapter(ImmutableList.class, new ImmutableListDeserializer()).
registerTypeAdapter(ImmutableMap.class, new ImmutableMapDeserializer()).
registerTypeAdapter(EntityNotification.class, new EntityNotificationDeserializer()).
registerTypeAdapter(IStruct.class, new StructDeserializer()).
- registerTypeAdapter(IReferenceableInstance.class, new ReferenceableDeserializer()).
- registerTypeAdapter(JSONArray.class, new JSONArrayDeserializer()).
+ registerTypeAdapter(IReferenceableInstance.class, new ReferenceableSerializerDeserializer()).
+ registerTypeAdapter(Referenceable.class, new ReferenceableSerializerDeserializer()).
+ registerTypeAdapter(JSONArray.class, new JSONArraySerializerDeserializer()).
+ registerTypeAdapter(HookNotification.HookNotificationMessage.class, new HookNotification()).
create();
private final Class<T> type;
@@ -136,30 +143,44 @@ public abstract class AbstractNotificationConsumer<T> implements NotificationCon
// ----- inner class : StructDeserializer -------------------------------
- public final static class StructDeserializer implements JsonDeserializer<IStruct> {
+ public final static class StructDeserializer implements JsonDeserializer<IStruct>, JsonSerializer<IStruct> {
@Override
public IStruct deserialize(final JsonElement json, final Type type,
final JsonDeserializationContext context) throws JsonParseException {
return context.deserialize(json, Struct.class);
}
+
+ @Override
+ public JsonElement serialize(IStruct src, Type typeOfSrc, JsonSerializationContext context) {
+ String instanceJson = InstanceSerialization.toJson(src, true);
+ return new JsonParser().parse(instanceJson).getAsJsonObject();
+ }
}
- // ----- inner class : ReferenceableDeserializer ------------------------
+ // ----- inner class : ReferenceableSerializerDeserializer ------------------------
- public final static class ReferenceableDeserializer implements JsonDeserializer<IStruct> {
+ public final static class ReferenceableSerializerDeserializer implements JsonDeserializer<IStruct>,
+ JsonSerializer<IReferenceableInstance> {
@Override
public IReferenceableInstance deserialize(final JsonElement json, final Type type,
final JsonDeserializationContext context) throws JsonParseException {
return InstanceSerialization.fromJsonReferenceable(json.toString(), true);
}
+
+ @Override
+ public JsonElement serialize(IReferenceableInstance src, Type typeOfSrc, JsonSerializationContext context) {
+ String instanceJson = InstanceSerialization.toJson(src, true);
+ return new JsonParser().parse(instanceJson).getAsJsonObject();
+ }
}
- // ----- inner class : JSONArrayDeserializer ----------------------------
+ // ----- inner class : JSONArraySerializerDeserializer ----------------------------
- public final static class JSONArrayDeserializer implements JsonDeserializer<JSONArray> {
+ public final static class JSONArraySerializerDeserializer implements JsonDeserializer<JSONArray>,
+ JsonSerializer<JSONArray> {
@Override
public JSONArray deserialize(final JsonElement json, final Type type,
final JsonDeserializationContext context) throws JsonParseException {
@@ -170,5 +191,10 @@ public abstract class AbstractNotificationConsumer<T> implements NotificationCon
throw new JsonParseException(e.getMessage(), e);
}
}
+
+ @Override
+ public JsonElement serialize(JSONArray src, Type typeOfSrc, JsonSerializationContext context) {
+ return new JsonParser().parse(src.toString()).getAsJsonArray();
+ }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/dad90970/notification/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
----------------------------------------------------------------------
diff --git a/notification/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java b/notification/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
index 1bee26f..6876758 100644
--- a/notification/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
+++ b/notification/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
@@ -23,9 +23,9 @@ import org.apache.atlas.ApplicationProperties;
import org.apache.atlas.AtlasClient;
import org.apache.atlas.AtlasException;
import org.apache.atlas.AtlasServiceException;
+import org.apache.atlas.notification.hook.HookNotification;
import org.apache.atlas.service.Service;
import org.apache.commons.configuration.Configuration;
-import org.codehaus.jettison.json.JSONArray;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -57,12 +57,12 @@ public class NotificationHookConsumer implements Service {
String atlasEndpoint = applicationProperties.getString(ATLAS_ENDPOINT_PROPERTY, "http://localhost:21000");
atlasClient = new AtlasClient(atlasEndpoint);
int numThreads = applicationProperties.getInt(CONSUMER_THREADS_PROPERTY, 1);
- List<NotificationConsumer<JSONArray>> consumers =
+ List<NotificationConsumer<HookNotification.HookNotificationMessage>> consumers =
notificationInterface.createConsumers(NotificationInterface.NotificationType.HOOK, numThreads);
executors = Executors.newFixedThreadPool(consumers.size());
- for (final NotificationConsumer<JSONArray> consumer : consumers) {
- executors.submit(new HookConsumer(consumer));
+ for (final NotificationConsumer<HookNotification.HookNotificationMessage> consumer : consumers) {
+ executors.submit(new HookConsumer(atlasClient, consumer));
}
}
@@ -86,15 +86,12 @@ public class NotificationHookConsumer implements Service {
}
class HookConsumer implements Runnable {
- private final NotificationConsumer<JSONArray> consumer;
+ private final NotificationConsumer<HookNotification.HookNotificationMessage> consumer;
private final AtlasClient client;
- public HookConsumer(NotificationConsumer<JSONArray> consumer) {
- this(atlasClient, consumer);
- }
-
- public HookConsumer(AtlasClient client, NotificationConsumer<JSONArray> consumer) {
- this.client = client;
+ public HookConsumer(AtlasClient atlasClient,
+ NotificationConsumer<HookNotification.HookNotificationMessage> consumer) {
+ this.client = atlasClient;
this.consumer = consumer;
}
@@ -106,14 +103,32 @@ public class NotificationHookConsumer implements Service {
}
while(consumer.hasNext()) {
- JSONArray entityJson = consumer.next();
- LOG.info("Processing message {}", entityJson);
+ HookNotification.HookNotificationMessage message = consumer.next();
+
try {
- JSONArray guids = atlasClient.createEntity(entityJson);
- LOG.info("Create entities with guid {}", guids);
+ switch (message.getType()) {
+ case ENTITY_CREATE:
+ HookNotification.EntityCreateRequest createRequest =
+ (HookNotification.EntityCreateRequest) message;
+ atlasClient.createEntity(createRequest.getEntities());
+ break;
+
+ case ENTITY_PARTIAL_UPDATE:
+ HookNotification.EntityPartialUpdateRequest partialUpdateRequest =
+ (HookNotification.EntityPartialUpdateRequest) message;
+ atlasClient.updateEntity(partialUpdateRequest.getTypeName(),
+ partialUpdateRequest.getAttribute(), partialUpdateRequest.getAttributeValue(),
+ partialUpdateRequest.getEntity());
+ break;
+
+ case ENTITY_FULL_UPDATE:
+ HookNotification.EntityUpdateRequest updateRequest =
+ (HookNotification.EntityUpdateRequest) message;
+ atlasClient.updateEntities(updateRequest.getEntities());
+ break;
+ }
} catch (Exception e) {
- //todo handle failures
- LOG.warn("Error handling message {}", entityJson, e);
+ LOG.debug("Error handling message {}", message, e);
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/dad90970/notification/src/main/java/org/apache/atlas/notification/NotificationInterface.java
----------------------------------------------------------------------
diff --git a/notification/src/main/java/org/apache/atlas/notification/NotificationInterface.java b/notification/src/main/java/org/apache/atlas/notification/NotificationInterface.java
index 3e68998..e4c4fd6 100644
--- a/notification/src/main/java/org/apache/atlas/notification/NotificationInterface.java
+++ b/notification/src/main/java/org/apache/atlas/notification/NotificationInterface.java
@@ -18,7 +18,7 @@
package org.apache.atlas.notification;
import org.apache.atlas.notification.entity.EntityNotification;
-import org.codehaus.jettison.json.JSONArray;
+import org.apache.atlas.notification.hook.HookNotification;
import java.util.List;
@@ -28,7 +28,7 @@ public interface NotificationInterface {
String PROPERTY_PREFIX = "atlas.notification";
enum NotificationType {
- HOOK(JSONArray.class), ENTITIES(EntityNotification.class);
+ HOOK(HookNotification.HookNotificationMessage.class), ENTITIES(EntityNotification.class);
private final Class classType;
@@ -52,7 +52,9 @@ public interface NotificationInterface {
*/
<T> List<NotificationConsumer<T>> createConsumers(NotificationType notificationType, int numConsumers);
- void send(NotificationType type, String... messages) throws NotificationException;
+ <T> void send(NotificationType type, T... messages) throws NotificationException;
+
+ <T> void send(NotificationType type, List<T> messages) throws NotificationException;
void close();
}
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/dad90970/notification/src/main/java/org/apache/atlas/notification/entity/NotificationEntityChangeListener.java
----------------------------------------------------------------------
diff --git a/notification/src/main/java/org/apache/atlas/notification/entity/NotificationEntityChangeListener.java b/notification/src/main/java/org/apache/atlas/notification/entity/NotificationEntityChangeListener.java
index e2d16df..243f93e 100644
--- a/notification/src/main/java/org/apache/atlas/notification/entity/NotificationEntityChangeListener.java
+++ b/notification/src/main/java/org/apache/atlas/notification/entity/NotificationEntityChangeListener.java
@@ -17,12 +17,6 @@
package org.apache.atlas.notification.entity;
-import com.google.gson.Gson;
-import com.google.gson.GsonBuilder;
-import com.google.gson.JsonElement;
-import com.google.gson.JsonParser;
-import com.google.gson.JsonSerializationContext;
-import com.google.gson.JsonSerializer;
import org.apache.atlas.AtlasException;
import org.apache.atlas.listener.EntityChangeListener;
import org.apache.atlas.notification.NotificationInterface;
@@ -30,10 +24,8 @@ import org.apache.atlas.typesystem.IReferenceableInstance;
import org.apache.atlas.typesystem.IStruct;
import org.apache.atlas.typesystem.ITypedReferenceableInstance;
import org.apache.atlas.typesystem.Referenceable;
-import org.apache.atlas.typesystem.json.InstanceSerialization;
import org.apache.atlas.typesystem.types.TypeSystem;
-import java.lang.reflect.Type;
import java.util.Collection;
import java.util.Collections;
import java.util.LinkedList;
@@ -44,9 +36,6 @@ import java.util.List;
*/
public class NotificationEntityChangeListener implements EntityChangeListener {
- private static final Gson GSON = new GsonBuilder().
- registerTypeAdapter(Referenceable.class, new ReferencableSerializer()).create();
-
private final NotificationInterface notificationInterface;
private final TypeSystem typeSystem;
@@ -93,7 +82,7 @@ public class NotificationEntityChangeListener implements EntityChangeListener {
// send notification of entity change
private void notifyOfEntityEvent(Collection<ITypedReferenceableInstance> entityDefinitions,
EntityNotification.OperationType operationType) throws AtlasException {
- List<String> messages = new LinkedList<>();
+ List<EntityNotification> messages = new LinkedList<>();
for (IReferenceableInstance entityDefinition : entityDefinitions) {
Referenceable entity = new Referenceable(entityDefinition);
@@ -101,24 +90,9 @@ public class NotificationEntityChangeListener implements EntityChangeListener {
EntityNotificationImpl notification =
new EntityNotificationImpl(entity, operationType, typeSystem);
- messages.add(GSON.toJson(notification));
+ messages.add(notification);
}
- notificationInterface.send(NotificationInterface.NotificationType.ENTITIES,
- messages.toArray(new String[messages.size()]));
- }
-
-
- // ----- inner class : ReferencableSerializer ---------------------------
-
- private static class ReferencableSerializer implements JsonSerializer<Referenceable> {
-
- public static final JsonParser JSON_PARSER = new JsonParser();
-
- @Override
- public JsonElement serialize(Referenceable referenceable, Type type,
- JsonSerializationContext jsonSerializationContext) {
- return JSON_PARSER.parse(InstanceSerialization.toJson(referenceable, true)).getAsJsonObject();
- }
+ notificationInterface.send(NotificationInterface.NotificationType.ENTITIES, messages);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/dad90970/notification/src/main/java/org/apache/atlas/notification/hook/HookNotification.java
----------------------------------------------------------------------
diff --git a/notification/src/main/java/org/apache/atlas/notification/hook/HookNotification.java b/notification/src/main/java/org/apache/atlas/notification/hook/HookNotification.java
new file mode 100644
index 0000000..568f58b
--- /dev/null
+++ b/notification/src/main/java/org/apache/atlas/notification/hook/HookNotification.java
@@ -0,0 +1,174 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.notification.hook;
+
+import com.google.gson.JsonDeserializationContext;
+import com.google.gson.JsonDeserializer;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParseException;
+import org.apache.atlas.typesystem.Referenceable;
+import org.apache.atlas.typesystem.TypesDef;
+import org.apache.atlas.typesystem.json.InstanceSerialization;
+import org.codehaus.jettison.json.JSONArray;
+import org.codehaus.jettison.json.JSONException;
+
+import java.lang.reflect.Type;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+public class HookNotification implements JsonDeserializer<HookNotification.HookNotificationMessage> {
+
+ @Override
+ public HookNotificationMessage deserialize(JsonElement json, Type typeOfT,
+ JsonDeserializationContext context) throws JsonParseException {
+ if (json.isJsonArray()) {
+ JSONArray jsonArray = context.deserialize(json, JSONArray.class);
+ return new EntityCreateRequest(jsonArray);
+ } else {
+ HookNotificationType type =
+ context.deserialize(((JsonObject) json).get("type"), HookNotificationType.class);
+ switch (type) {
+ case ENTITY_CREATE:
+ return context.deserialize(json, EntityCreateRequest.class);
+
+ case ENTITY_FULL_UPDATE:
+ return context.deserialize(json, EntityUpdateRequest.class);
+
+ case ENTITY_PARTIAL_UPDATE:
+ return context.deserialize(json, EntityPartialUpdateRequest.class);
+
+ case TYPE_CREATE:
+ case TYPE_UPDATE:
+ return context.deserialize(json, TypeRequest.class);
+ }
+ throw new IllegalStateException("Unhandled type " + type);
+ }
+ }
+
+ public enum HookNotificationType {
+ TYPE_CREATE, TYPE_UPDATE, ENTITY_CREATE, ENTITY_PARTIAL_UPDATE, ENTITY_FULL_UPDATE
+ }
+
+ public static class HookNotificationMessage {
+ protected HookNotificationType type;
+
+ private HookNotificationMessage() { }
+
+ public HookNotificationMessage(HookNotificationType type) {
+ this.type = type;
+ }
+
+ public HookNotificationType getType() {
+ return type;
+ }
+ }
+
+ public static class TypeRequest extends HookNotificationMessage {
+ private TypesDef typesDef;
+
+ private TypeRequest() { }
+
+ public TypeRequest(HookNotificationType type, TypesDef typesDef) {
+ super(type);
+ this.typesDef = typesDef;
+ }
+
+ public TypesDef getTypesDef() {
+ return typesDef;
+ }
+ }
+
+ public static class EntityCreateRequest extends HookNotificationMessage {
+ private List<Referenceable> entities;
+
+ private EntityCreateRequest() { }
+
+ public EntityCreateRequest(Referenceable... entities) {
+ super(HookNotificationType.ENTITY_CREATE);
+ this.entities = Arrays.asList(entities);
+ }
+
+ protected EntityCreateRequest(HookNotificationType type, List<Referenceable> entities) {
+ super(type);
+ this.entities = entities;
+ }
+
+ public EntityCreateRequest(JSONArray jsonArray) {
+ super(HookNotificationType.ENTITY_CREATE);
+ entities = new ArrayList<>();
+ for (int index = 0; index < jsonArray.length(); index++) {
+ try {
+ entities.add(InstanceSerialization.fromJsonReferenceable(jsonArray.getString(index), true));
+ } catch (JSONException e) {
+ throw new JsonParseException(e);
+ }
+ }
+ }
+
+ public List<Referenceable> getEntities() throws JSONException {
+ return entities;
+ }
+ }
+
+ public static class EntityUpdateRequest extends EntityCreateRequest {
+ public EntityUpdateRequest(Referenceable... entities) {
+ this(Arrays.asList(entities));
+ }
+
+ public EntityUpdateRequest(List<Referenceable> entities) {
+ super(HookNotificationType.ENTITY_FULL_UPDATE, entities);
+ }
+ }
+
+ public static class EntityPartialUpdateRequest extends HookNotificationMessage {
+ private String typeName;
+ private String attribute;
+ private Referenceable entity;
+ private String attributeValue;
+
+ private EntityPartialUpdateRequest() { }
+
+ public EntityPartialUpdateRequest(String typeName, String attribute, String attributeValue,
+ Referenceable entity) {
+ super(HookNotificationType.ENTITY_PARTIAL_UPDATE);
+ this.typeName = typeName;
+ this.attribute = attribute;
+ this.attributeValue = attributeValue;
+ this.entity = entity;
+ }
+
+ public String getTypeName() {
+ return typeName;
+ }
+
+ public String getAttribute() {
+ return attribute;
+ }
+
+ public Referenceable getEntity() {
+ return entity;
+ }
+
+ public String getAttributeValue() {
+ return attributeValue;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/dad90970/notification/src/test/java/org/apache/atlas/kafka/KafkaNotificationTest.java
----------------------------------------------------------------------
diff --git a/notification/src/test/java/org/apache/atlas/kafka/KafkaNotificationTest.java b/notification/src/test/java/org/apache/atlas/kafka/KafkaNotificationTest.java
index 625a0b0..eb90f52 100644
--- a/notification/src/test/java/org/apache/atlas/kafka/KafkaNotificationTest.java
+++ b/notification/src/test/java/org/apache/atlas/kafka/KafkaNotificationTest.java
@@ -26,9 +26,7 @@ import org.apache.atlas.notification.NotificationConsumer;
import org.apache.atlas.notification.NotificationInterface;
import org.apache.atlas.notification.NotificationModule;
import org.apache.commons.configuration.Configuration;
-import org.apache.commons.lang.RandomStringUtils;
import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.codehaus.jettison.json.JSONArray;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Guice;
@@ -61,20 +59,6 @@ public class KafkaNotificationTest {
}
@Test
- public void testSendReceiveMessage() throws Exception {
- String msg1 = "[{\"message\": " + 123 + "}]";
- String msg2 = "[{\"message\": " + 456 + "}]";
- kafka.send(NotificationInterface.NotificationType.HOOK, msg1, msg2);
- List<NotificationConsumer<JSONArray>> consumers =
- kafka.createConsumers(NotificationInterface.NotificationType.HOOK, 1);
- NotificationConsumer<JSONArray> consumer = consumers.get(0);
- assertTrue(consumer.hasNext());
- assertEquals(new JSONArray(msg1), consumer.next());
- assertTrue(consumer.hasNext());
- assertEquals(new JSONArray(msg2), consumer.next());
- }
-
- @Test
@SuppressWarnings("unchecked")
public void testCreateConsumers() throws Exception {
Configuration configuration = mock(Configuration.class);
@@ -119,10 +103,6 @@ public class KafkaNotificationTest {
assertEquals(groupId, properties.getProperty(ConsumerConfig.GROUP_ID_CONFIG));
}
- private String random() {
- return RandomStringUtils.randomAlphanumeric(5);
- }
-
@AfterClass
public void teardown() throws Exception {
kafka.stop();
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/dad90970/notification/src/test/java/org/apache/atlas/notification/hook/HookNotificationTest.java
----------------------------------------------------------------------
diff --git a/notification/src/test/java/org/apache/atlas/notification/hook/HookNotificationTest.java b/notification/src/test/java/org/apache/atlas/notification/hook/HookNotificationTest.java
new file mode 100644
index 0000000..4b9f81f
--- /dev/null
+++ b/notification/src/test/java/org/apache/atlas/notification/hook/HookNotificationTest.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.notification.hook;
+
+import org.apache.atlas.notification.AbstractNotificationConsumer;
+import org.apache.atlas.typesystem.Referenceable;
+import org.apache.atlas.typesystem.json.InstanceSerialization;
+import org.codehaus.jettison.json.JSONArray;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+
+public class HookNotificationTest {
+
+ @Test
+ public void testMessageBackwardCompatibility() throws Exception {
+ JSONArray jsonArray = new JSONArray();
+ Referenceable entity = new Referenceable("sometype");
+ entity.set("name", "somename");
+ String entityJson = InstanceSerialization.toJson(entity, true);
+ jsonArray.put(entityJson);
+
+ HookNotification.HookNotificationMessage notification = AbstractNotificationConsumer.GSON.fromJson(
+ jsonArray.toString(), HookNotification.HookNotificationMessage.class);
+ assertNotNull(notification);
+ assertEquals(notification.getType(), HookNotification.HookNotificationType.ENTITY_CREATE);
+ HookNotification.EntityCreateRequest createRequest = (HookNotification.EntityCreateRequest) notification;
+ assertEquals(createRequest.getEntities().size(), 1);
+ assertEquals(createRequest.getEntities().get(0).getTypeName(), entity.getTypeName());
+ }
+
+ @Test
+ public void testNewMessageSerDe() throws Exception {
+ Referenceable entity1 = new Referenceable("sometype");
+ entity1.set("attr", "value");
+ entity1.set("complex", new Referenceable("othertype"));
+ Referenceable entity2 = new Referenceable("newtype");
+ HookNotification.EntityCreateRequest request = new HookNotification.EntityCreateRequest(entity1, entity2);
+
+ String notificationJson = AbstractNotificationConsumer.GSON.toJson(request);
+ HookNotification.HookNotificationMessage actualNotification = AbstractNotificationConsumer.GSON.fromJson(
+ notificationJson, HookNotification.HookNotificationMessage.class);
+ assertEquals(actualNotification.getType(), HookNotification.HookNotificationType.ENTITY_CREATE);
+ HookNotification.EntityCreateRequest createRequest = (HookNotification.EntityCreateRequest) actualNotification;
+ assertEquals(createRequest.getEntities().size(), 2);
+ Referenceable actualEntity1 = createRequest.getEntities().get(0);
+ assertEquals(actualEntity1.getTypeName(), "sometype");
+ assertEquals(((Referenceable)actualEntity1.get("complex")).getTypeName(), "othertype");
+ assertEquals(createRequest.getEntities().get(1).getTypeName(), "newtype");
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/dad90970/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 1a324b7..3a368f3 100755
--- a/pom.xml
+++ b/pom.xml
@@ -354,7 +354,8 @@
<!-- skips checkstyle and find bugs -->
<skipCheck>false</skipCheck>
- <skipTests>false</skipTests>
+ <skipUTs>false</skipUTs>
+ <skipITs>false</skipITs>
<skipDocs>true</skipDocs>
<skipSite>true</skipSite>
<projectBaseDir>${project.basedir}</projectBaseDir>
@@ -1087,7 +1088,7 @@
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
- <version>2.3.1</version>
+ <version>2.5</version>
</dependency>
<dependency>
@@ -1394,6 +1395,7 @@
<configuration>
<systemProperties>
<user.dir>${project.basedir}</user.dir>
+ <atlas.data>${project.build.directory}/data</atlas.data>
</systemProperties>
<!--<skipTests>true</skipTests>-->
<forkMode>always</forkMode>
@@ -1403,6 +1405,7 @@
-Xmx1024m -XX:MaxPermSize=512m -Dlog4j.configuration=atlas-log4j.xml
-Djava.net.preferIPv4Stack=true
</argLine>
+ <skip>${skipUTs}</skip>
<excludes>
<exclude>**/*Base*</exclude>
</excludes>
@@ -1423,12 +1426,14 @@
<configuration>
<systemPropertyVariables>
<projectBaseDir>${projectBaseDir}</projectBaseDir>
+ <atlas.data>${project.build.directory}/data</atlas.data>
</systemPropertyVariables>
<redirectTestOutputToFile>true</redirectTestOutputToFile>
<argLine>-Djava.awt.headless=true -Dproject.version=${project.version}
-Dhadoop.tmp.dir="${project.build.directory}/tmp-hadoop-${user.name}"
-Xmx1024m -XX:MaxPermSize=512m -Dlog4j.configuration=atlas-log4j.xml
</argLine>
+ <skip>${skipITs}</skip>
<parallel>none</parallel>
<reuseForks>false</reuseForks>
<forkCount>1</forkCount>
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/dad90970/release-log.txt
----------------------------------------------------------------------
diff --git a/release-log.txt b/release-log.txt
index f058148..64dc568 100644
--- a/release-log.txt
+++ b/release-log.txt
@@ -9,6 +9,7 @@ ATLAS-54 Rename configs in hive hook (shwethags)
ATLAS-3 Mixed Index creation fails with Date types (sumasai via shwethags)
ALL CHANGES:
+ATLAS-386 Handle hive rename Table (shwethags)
ATLAS-374 Doc: Create a wiki for documenting fault tolerance and HA options for Atlas data (yhemath via sumasai)
ATLAS-346 Atlas server loses messages sent from Hive hook if restarted after unclean shutdown(yhemath via sumasai)
ATLAS-382 Fixed Hive Bridge doc for ATLAS cluster name (sumasai)
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/dad90970/repository/src/main/java/org/apache/atlas/services/DefaultMetadataService.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/org/apache/atlas/services/DefaultMetadataService.java b/repository/src/main/java/org/apache/atlas/services/DefaultMetadataService.java
index fb782a2..f605c26 100755
--- a/repository/src/main/java/org/apache/atlas/services/DefaultMetadataService.java
+++ b/repository/src/main/java/org/apache/atlas/services/DefaultMetadataService.java
@@ -304,7 +304,14 @@ public class DefaultMetadataService implements MetadataService {
ParamChecker.notEmpty(entityTypeName, "Entity type cannot be null");
ClassType entityType = typeSystem.getDataType(ClassType.class, entityTypeName);
+
+ //Both assigned id and values are required for full update
+ //classtype.convert() will remove values if id is assigned. So, set temp id, convert and
+ // then replace with original id
+ Id origId = entityInstance.getId();
+ entityInstance.replaceWithNewId(new Id(entityInstance.getTypeName()));
ITypedReferenceableInstance typedInstrance = entityType.convert(entityInstance, Multiplicity.REQUIRED);
+ ((ReferenceableInstance)typedInstrance).replaceWithNewId(origId);
instances[index] = typedInstrance;
}
return instances;
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/dad90970/repository/src/test/java/org/apache/atlas/TestUtils.java
----------------------------------------------------------------------
diff --git a/repository/src/test/java/org/apache/atlas/TestUtils.java b/repository/src/test/java/org/apache/atlas/TestUtils.java
index 12c47d4..2a45bc8 100755
--- a/repository/src/test/java/org/apache/atlas/TestUtils.java
+++ b/repository/src/test/java/org/apache/atlas/TestUtils.java
@@ -251,12 +251,12 @@ public final class TestUtils {
new AttributeDefinition("columnsMap",
DataTypes.mapTypeName(DataTypes.STRING_TYPE.getName(),
"column_type"),
- Multiplicity.COLLECTION, true, null),
+ Multiplicity.OPTIONAL, true, null),
//map of structs
new AttributeDefinition("partitionsMap",
DataTypes.mapTypeName(DataTypes.STRING_TYPE.getName(),
"partition_type"),
- Multiplicity.COLLECTION, true, null),
+ Multiplicity.OPTIONAL, true, null),
// struct reference
new AttributeDefinition("serde1", "serdeType", Multiplicity.OPTIONAL, false, null),
new AttributeDefinition("serde2", "serdeType", Multiplicity.OPTIONAL, false, null),
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/dad90970/repository/src/test/java/org/apache/atlas/repository/graph/TitanGraphProviderTest.java
----------------------------------------------------------------------
diff --git a/repository/src/test/java/org/apache/atlas/repository/graph/TitanGraphProviderTest.java b/repository/src/test/java/org/apache/atlas/repository/graph/TitanGraphProviderTest.java
index 6fc7008..d824b50 100644
--- a/repository/src/test/java/org/apache/atlas/repository/graph/TitanGraphProviderTest.java
+++ b/repository/src/test/java/org/apache/atlas/repository/graph/TitanGraphProviderTest.java
@@ -19,12 +19,9 @@ package org.apache.atlas.repository.graph;
import com.thinkaurelius.titan.core.TitanGraph;
import com.thinkaurelius.titan.core.util.TitanCleanup;
-import com.thinkaurelius.titan.diskstorage.Backend;
-import com.thinkaurelius.titan.graphdb.database.StandardTitanGraph;
import org.apache.atlas.ApplicationProperties;
import org.apache.atlas.AtlasException;
import org.apache.commons.configuration.Configuration;
-import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeTest;
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/dad90970/repository/src/test/java/org/apache/atlas/service/DefaultMetadataServiceTest.java
----------------------------------------------------------------------
diff --git a/repository/src/test/java/org/apache/atlas/service/DefaultMetadataServiceTest.java b/repository/src/test/java/org/apache/atlas/service/DefaultMetadataServiceTest.java
index 0352ef3..0307fd4 100644
--- a/repository/src/test/java/org/apache/atlas/service/DefaultMetadataServiceTest.java
+++ b/repository/src/test/java/org/apache/atlas/service/DefaultMetadataServiceTest.java
@@ -22,9 +22,6 @@ import com.google.common.collect.ImmutableList;
import com.google.inject.Inject;
import com.thinkaurelius.titan.core.TitanGraph;
import com.thinkaurelius.titan.core.util.TitanCleanup;
-import org.apache.atlas.typesystem.exception.TypeNotFoundException;
-import org.apache.atlas.typesystem.exception.EntityNotFoundException;
-import org.apache.atlas.utils.ParamChecker;
import org.apache.atlas.RepositoryMetadataModule;
import org.apache.atlas.TestUtils;
import org.apache.atlas.repository.graph.GraphProvider;
@@ -32,12 +29,15 @@ import org.apache.atlas.services.MetadataService;
import org.apache.atlas.typesystem.Referenceable;
import org.apache.atlas.typesystem.Struct;
import org.apache.atlas.typesystem.TypesDef;
+import org.apache.atlas.typesystem.exception.EntityNotFoundException;
+import org.apache.atlas.typesystem.exception.TypeNotFoundException;
import org.apache.atlas.typesystem.json.InstanceSerialization;
import org.apache.atlas.typesystem.json.TypesSerialization;
import org.apache.atlas.typesystem.persistence.Id;
import org.apache.atlas.typesystem.types.EnumValue;
import org.apache.atlas.typesystem.types.TypeSystem;
import org.apache.atlas.typesystem.types.ValueConversionException;
+import org.apache.atlas.utils.ParamChecker;
import org.apache.commons.lang.RandomStringUtils;
import org.codehaus.jettison.json.JSONArray;
import org.testng.Assert;
@@ -47,11 +47,15 @@ import org.testng.annotations.Guice;
import org.testng.annotations.Test;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+
@Guice(modules = RepositoryMetadataModule.class)
public class DefaultMetadataServiceTest {
@Inject
@@ -296,8 +300,8 @@ public class DefaultMetadataServiceTest {
Map<String, Object> values = new HashMap<>();
values.put("name", "col1");
values.put("type", "type");
- Referenceable ref = new Referenceable("column_type", values);
- columns.add(ref);
+ Referenceable col1 = new Referenceable("column_type", values);
+ columns.add(col1);
Referenceable tableUpdated = new Referenceable(TestUtils.TABLE_TYPE, new HashMap<String, Object>() {{
put("columns", columns);
}});
@@ -307,19 +311,18 @@ public class DefaultMetadataServiceTest {
metadataService.getEntityDefinition(TestUtils.TABLE_TYPE, "name", (String) table.get("name"));
Referenceable tableDefinition = InstanceSerialization.fromJsonReferenceable(tableDefinitionJson, true);
final List<Referenceable> arrClsColumns = (List) tableDefinition.get("columns");
- Assert.assertTrue(arrClsColumns.get(0).equalsContents(columns.get(0)));
+ assertReferenceables(arrClsColumns.get(0), columns.get(0));
//Partial update. Add col5 But also update col1
Map<String, Object> valuesCol5 = new HashMap<>();
valuesCol5.put("name", "col5");
valuesCol5.put("type", "type");
- ref = new Referenceable("column_type", valuesCol5);
+ Referenceable col2 = new Referenceable("column_type", valuesCol5);
//update col1
- arrClsColumns.get(0).set("type", "type1");
+ col1.set("type", "type1");
//add col5
- final List<Referenceable> updateColumns = new ArrayList<>(arrClsColumns);
- updateColumns.add(ref);
+ final List<Referenceable> updateColumns = Arrays.asList(col1, col2);
tableUpdated = new Referenceable(TestUtils.TABLE_TYPE, new HashMap<String, Object>() {{
put("columns", updateColumns);
@@ -331,8 +334,8 @@ public class DefaultMetadataServiceTest {
tableDefinition = InstanceSerialization.fromJsonReferenceable(tableDefinitionJson, true);
List<Referenceable> arrColumnsList = (List) tableDefinition.get("columns");
Assert.assertEquals(arrColumnsList.size(), 2);
- Assert.assertTrue(arrColumnsList.get(0).equalsContents(updateColumns.get(0)));
- Assert.assertTrue(arrColumnsList.get(1).equalsContents(updateColumns.get(1)));
+ assertReferenceables(arrColumnsList.get(0), updateColumns.get(0));
+ assertReferenceables(arrColumnsList.get(1), updateColumns.get(1));
//Complete update. Add array elements - col3,4
Map<String, Object> values1 = new HashMap<>();
@@ -355,9 +358,8 @@ public class DefaultMetadataServiceTest {
tableDefinition = InstanceSerialization.fromJsonReferenceable(tableDefinitionJson, true);
arrColumnsList = (List) tableDefinition.get("columns");
Assert.assertEquals(arrColumnsList.size(), columns.size());
- Assert.assertTrue(arrColumnsList.get(1).equalsContents(columns.get(1)));
- Assert.assertTrue(arrColumnsList.get(2).equalsContents(columns.get(2)));
-
+ assertReferenceables(arrColumnsList.get(1), columns.get(1));
+ assertReferenceables(arrColumnsList.get(2), columns.get(2));
//Remove a class reference/Id and insert another reference
//Also covers isComposite case since columns is a composite
@@ -366,8 +368,8 @@ public class DefaultMetadataServiceTest {
values.put("name", "col2");
values.put("type", "type");
- ref = new Referenceable("column_type", values);
- columns.add(ref);
+ col1 = new Referenceable("column_type", values);
+ columns.add(col1);
table.set("columns", columns);
updateInstance(table);
@@ -376,7 +378,7 @@ public class DefaultMetadataServiceTest {
tableDefinition = InstanceSerialization.fromJsonReferenceable(tableDefinitionJson, true);
arrColumnsList = (List) tableDefinition.get("columns");
Assert.assertEquals(arrColumnsList.size(), columns.size());
- Assert.assertTrue(arrColumnsList.get(0).equalsContents(columns.get(0)));
+ assertReferenceables(arrColumnsList.get(0), columns.get(0));
//Update array column to null
table.setNull("columns");
@@ -389,6 +391,14 @@ public class DefaultMetadataServiceTest {
Assert.assertNull(tableDefinition.get("columns"));
}
+ private void assertReferenceables(Referenceable r1, Referenceable r2) {
+ assertEquals(r1.getTypeName(), r2.getTypeName());
+ assertTrue(r1.getTraits().equals(r2.getTraits()));
+ for (String attr : r1.getValuesMap().keySet()) {
+ assertTrue(r1.getValuesMap().get(attr).equals(r2.getValuesMap().get(attr)));
+ }
+ //TODO assert trait instances and complex attributes
+ }
@Test
public void testStructs() throws Exception {
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/dad90970/typesystem/src/main/java/org/apache/atlas/typesystem/Referenceable.java
----------------------------------------------------------------------
diff --git a/typesystem/src/main/java/org/apache/atlas/typesystem/Referenceable.java b/typesystem/src/main/java/org/apache/atlas/typesystem/Referenceable.java
index b8dcc7e..aa1736d 100755
--- a/typesystem/src/main/java/org/apache/atlas/typesystem/Referenceable.java
+++ b/typesystem/src/main/java/org/apache/atlas/typesystem/Referenceable.java
@@ -33,7 +33,7 @@ import java.util.Map;
*/
public class Referenceable extends Struct implements IReferenceableInstance {
- private final Id id;
+ private Id id;
private final ImmutableMap<String, IStruct> traits;
private final ImmutableList<String> traitNames;
@@ -151,6 +151,10 @@ public class Referenceable extends Struct implements IReferenceableInstance {
'}';
}
+ public void replaceWithNewId(Id id) {
+ this.id = id;
+ }
+
private static Map<String, IStruct> getTraits(IReferenceableInstance instance) throws AtlasException {
Map<String, IStruct> traits = new HashMap<>();
for (String traitName : instance.getTraits() ) {
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/dad90970/typesystem/src/main/java/org/apache/atlas/typesystem/types/ClassType.java
----------------------------------------------------------------------
diff --git a/typesystem/src/main/java/org/apache/atlas/typesystem/types/ClassType.java b/typesystem/src/main/java/org/apache/atlas/typesystem/types/ClassType.java
index ac758fa..adf5f1c 100755
--- a/typesystem/src/main/java/org/apache/atlas/typesystem/types/ClassType.java
+++ b/typesystem/src/main/java/org/apache/atlas/typesystem/types/ClassType.java
@@ -125,9 +125,9 @@ public class ClassType extends HierarchicalType<ClassType, IReferenceableInstanc
r != null ? createInstanceWithTraits(id, r, r.getTraits().toArray(new String[0])) :
createInstance(id);
-// if (id != null && id.isAssigned()) {
-// return tr;
-// }
+ if (id != null && id.isAssigned()) {
+ return tr;
+ }
for (Map.Entry<String, AttributeInfo> e : fieldMapping.fields.entrySet()) {
String attrKey = e.getKey();
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/dad90970/typesystem/src/main/java/org/apache/atlas/typesystem/types/Multiplicity.java
----------------------------------------------------------------------
diff --git a/typesystem/src/main/java/org/apache/atlas/typesystem/types/Multiplicity.java b/typesystem/src/main/java/org/apache/atlas/typesystem/types/Multiplicity.java
index 74d7f7c..a54dabc 100755
--- a/typesystem/src/main/java/org/apache/atlas/typesystem/types/Multiplicity.java
+++ b/typesystem/src/main/java/org/apache/atlas/typesystem/types/Multiplicity.java
@@ -25,8 +25,8 @@ public final class Multiplicity {
public static final Multiplicity OPTIONAL = new Multiplicity(0, 1, false);
public static final Multiplicity REQUIRED = new Multiplicity(1, 1, false);
- public static final Multiplicity COLLECTION = new Multiplicity(0, Integer.MAX_VALUE, false);
- public static final Multiplicity SET = new Multiplicity(0, Integer.MAX_VALUE, true);
+ public static final Multiplicity COLLECTION = new Multiplicity(1, Integer.MAX_VALUE, false);
+ public static final Multiplicity SET = new Multiplicity(1, Integer.MAX_VALUE, true);
public final int lower;
public final int upper;
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/dad90970/typesystem/src/main/resources/application.properties
----------------------------------------------------------------------
diff --git a/typesystem/src/main/resources/application.properties b/typesystem/src/main/resources/application.properties
index 4a351e6..d475d7e 100644
--- a/typesystem/src/main/resources/application.properties
+++ b/typesystem/src/main/resources/application.properties
@@ -27,7 +27,7 @@ atlas.graph.storage.backend=${titan.storage.backend}
atlas.graph.index.search.backend=${titan.index.backend}
#Berkeley storage directory
-atlas.graph.storage.directory=target/data/berkley
+atlas.graph.storage.directory=${sys:atlas.data}/berkley
#hbase
#For standalone mode , specify localhost
@@ -38,7 +38,7 @@ atlas.graph.storage.hbase.regions-per-server=1
atlas.graph.storage.lock.wait-time=10000
#ElasticSearch
-atlas.graph.index.search.directory=target/data/es
+atlas.graph.index.search.directory=${sys:atlas.data}/es
atlas.graph.index.search.elasticsearch.client-only=false
atlas.graph.index.search.elasticsearch.local-mode=true
atlas.graph.index.search.elasticsearch.create.sleep=2000
@@ -63,7 +63,7 @@ atlas.notification.embedded=true
atlas.kafka.zookeeper.connect=localhost:19026
atlas.kafka.bootstrap.servers=localhost:19027
-atlas.kafka.data=target/data/kafka
+atlas.kafka.data=${sys:atlas.data}/kafka
atlas.kafka.zookeeper.session.timeout.ms=400
atlas.kafka.zookeeper.sync.time.ms=20
atlas.kafka.auto.commit.interval.ms=100
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/dad90970/webapp/pom.xml
----------------------------------------------------------------------
diff --git a/webapp/pom.xml b/webapp/pom.xml
index 85a5f94..d893947 100755
--- a/webapp/pom.xml
+++ b/webapp/pom.xml
@@ -372,6 +372,10 @@
<name>atlas.home</name>
<value>${project.build.directory}</value>
</systemProperty>
+ <systemProperty>
+ <name>atlas.data</name>
+ <value>${project.build.directory}/data</value>
+ </systemProperty>
</systemProperties>
<stopKey>atlas-stop</stopKey>
<stopPort>31001</stopPort>
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/dad90970/webapp/src/main/java/org/apache/atlas/Atlas.java
----------------------------------------------------------------------
diff --git a/webapp/src/main/java/org/apache/atlas/Atlas.java b/webapp/src/main/java/org/apache/atlas/Atlas.java
index 5ae1e44..2d2d619 100755
--- a/webapp/src/main/java/org/apache/atlas/Atlas.java
+++ b/webapp/src/main/java/org/apache/atlas/Atlas.java
@@ -40,6 +40,7 @@ public final class Atlas {
private static final String APP_PATH = "app";
private static final String APP_PORT = "port";
private static final String ATLAS_HOME = "atlas.home";
+ private static final String ATLAS_DATA = "atlas.data";
private static final String ATLAS_LOG_DIR = "atlas.log.dir";
public static final String ATLAS_SERVER_HTTPS_PORT = "atlas.server.https.port";
public static final String ATLAS_SERVER_HTTP_PORT = "atlas.server.http.port";
@@ -110,6 +111,9 @@ public final class Atlas {
if (System.getProperty(ATLAS_HOME) == null) {
System.setProperty(ATLAS_HOME, "target");
}
+ if (System.getProperty(ATLAS_DATA) == null) {
+ System.setProperty(ATLAS_DATA, "target/data");
+ }
if (System.getProperty(ATLAS_LOG_DIR) == null) {
System.setProperty(ATLAS_LOG_DIR, "target/logs");
}
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/dad90970/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerIT.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerIT.java b/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerIT.java
index e5af26c..3a4661c 100644
--- a/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerIT.java
+++ b/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerIT.java
@@ -19,22 +19,22 @@
package org.apache.atlas.notification;
import com.google.inject.Inject;
+import org.apache.atlas.notification.hook.HookNotification;
import org.apache.atlas.typesystem.Referenceable;
-import org.apache.atlas.typesystem.json.InstanceSerialization;
import org.apache.atlas.web.resources.BaseResourceIT;
import org.codehaus.jettison.json.JSONArray;
-import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Guice;
import org.testng.annotations.Test;
+import static org.testng.Assert.assertEquals;
+
@Guice(modules = NotificationModule.class)
public class NotificationHookConsumerIT extends BaseResourceIT {
@Inject
private NotificationInterface kafka;
- private String dbName;
@BeforeClass
public void setUp() throws Exception {
@@ -47,57 +47,106 @@ public class NotificationHookConsumerIT extends BaseResourceIT {
kafka.close();
}
- private void sendHookMessage(Referenceable entity) throws NotificationException {
- String entityJson = InstanceSerialization.toJson(entity, true);
- JSONArray jsonArray = new JSONArray();
- jsonArray.put(entityJson);
- kafka.send(NotificationInterface.NotificationType.HOOK, jsonArray.toString());
+ private void sendHookMessage(HookNotification.HookNotificationMessage message) throws NotificationException {
+ kafka.send(NotificationInterface.NotificationType.HOOK, message);
}
@Test
- public void testConsumeHookMessage() throws Exception {
- Referenceable entity = new Referenceable(DATABASE_TYPE);
- dbName = "db" + randomString();
- entity.set("name", dbName);
+ public void testCreateEntity() throws Exception {
+ final Referenceable entity = new Referenceable(DATABASE_TYPE);
+ entity.set("name", "db" + randomString());
entity.set("description", randomString());
- sendHookMessage(entity);
+ sendHookMessage(new HookNotification.EntityCreateRequest(entity));
waitFor(1000, new Predicate() {
@Override
public boolean evaluate() throws Exception {
- JSONArray results =
- serviceClient.searchByDSL(String.format("%s where name='%s'", DATABASE_TYPE, dbName));
+ JSONArray results = serviceClient.searchByDSL(String.format("%s where name='%s'", DATABASE_TYPE,
+ entity.get("name")));
return results.length() == 1;
}
});
}
- @Test (dependsOnMethods = "testConsumeHookMessage")
- public void testEnityDeduping() throws Exception {
-// Referenceable db = serviceClient.getEntity(DATABASE_TYPE, "name", dbName);
- Referenceable db = new Referenceable(DATABASE_TYPE);
- db.set("name", dbName);
- db.set("description", randomString());
+ @Test
+ public void testUpdateEntityPartial() throws Exception {
+ final Referenceable entity = new Referenceable(DATABASE_TYPE);
+ final String dbName = "db" + randomString();
+ entity.set("name", dbName);
+ entity.set("description", randomString());
+ serviceClient.createEntity(entity);
- Referenceable table = new Referenceable(HIVE_TABLE_TYPE);
- final String tableName = randomString();
- table.set("name", tableName);
- table.set("db", db);
+ final Referenceable newEntity = new Referenceable(DATABASE_TYPE);
+ newEntity.set("owner", randomString());
+ sendHookMessage(new HookNotification.EntityPartialUpdateRequest(DATABASE_TYPE, "name", dbName, newEntity));
+ waitFor(1000, new Predicate() {
+ @Override
+ public boolean evaluate() throws Exception {
+ Referenceable localEntity = serviceClient.getEntity(DATABASE_TYPE, "name", dbName);
+ return (localEntity.get("owner") != null && localEntity.get("owner").equals(newEntity.get("owner")));
+ }
+ });
+
+ //Its partial update and un-set fields are not updated
+ Referenceable actualEntity = serviceClient.getEntity(DATABASE_TYPE, "name", dbName);
+ assertEquals(actualEntity.get("description"), entity.get("description"));
+ }
+
+ @Test
+ public void testUpdatePartialUpdatingQualifiedName() throws Exception {
+ final Referenceable entity = new Referenceable(DATABASE_TYPE);
+ final String dbName = "db" + randomString();
+ entity.set("name", dbName);
+ entity.set("description", randomString());
+ serviceClient.createEntity(entity);
- sendHookMessage(table);
+ final Referenceable newEntity = new Referenceable(DATABASE_TYPE);
+ final String newName = "db" + randomString();
+ newEntity.set("name", newName);
+
+ sendHookMessage(new HookNotification.EntityPartialUpdateRequest(DATABASE_TYPE, "name", dbName, newEntity));
waitFor(1000, new Predicate() {
@Override
public boolean evaluate() throws Exception {
- JSONArray results =
- serviceClient.searchByDSL(String.format("%s where name='%s'", HIVE_TABLE_TYPE, tableName));
+ JSONArray results = serviceClient.searchByDSL(String.format("%s where name='%s'", DATABASE_TYPE,
+ newName));
return results.length() == 1;
}
});
- JSONArray results =
- serviceClient.searchByDSL(String.format("%s where name='%s'", DATABASE_TYPE, dbName));
- Assert.assertEquals(results.length(), 1);
+ //no entity with the old qualified name
+ JSONArray results = serviceClient.searchByDSL(String.format("%s where name='%s'", DATABASE_TYPE, dbName));
+ assertEquals(results.length(), 0);
+
}
+ @Test
+ public void testUpdateEntityFullUpdate() throws Exception {
+ Referenceable entity = new Referenceable(DATABASE_TYPE);
+ final String dbName = "db" + randomString();
+ entity.set("name", dbName);
+ entity.set("description", randomString());
+ serviceClient.createEntity(entity);
+
+ final Referenceable newEntity = new Referenceable(DATABASE_TYPE);
+ newEntity.set("name", dbName);
+ newEntity.set("description", randomString());
+ newEntity.set("owner", randomString());
+
+ //updating unique attribute
+ sendHookMessage(new HookNotification.EntityUpdateRequest(newEntity));
+ waitFor(1000, new Predicate() {
+ @Override
+ public boolean evaluate() throws Exception {
+ JSONArray results = serviceClient.searchByDSL(String.format("%s where name='%s'", DATABASE_TYPE,
+ dbName));
+ return results.length() == 1;
+ }
+ });
+
+ Referenceable actualEntity = serviceClient.getEntity(DATABASE_TYPE, "name", dbName);
+ assertEquals(actualEntity.get("description"), newEntity.get("description"));
+ assertEquals(actualEntity.get("owner"), newEntity.get("owner"));
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/dad90970/webapp/src/test/java/org/apache/atlas/web/resources/EntityJerseyResourceIT.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/atlas/web/resources/EntityJerseyResourceIT.java b/webapp/src/test/java/org/apache/atlas/web/resources/EntityJerseyResourceIT.java
index 7337eaf..f476af3 100755
--- a/webapp/src/test/java/org/apache/atlas/web/resources/EntityJerseyResourceIT.java
+++ b/webapp/src/test/java/org/apache/atlas/web/resources/EntityJerseyResourceIT.java
@@ -58,6 +58,8 @@ import java.util.List;
import java.util.Map;
import java.util.UUID;
+import static org.testng.Assert.assertEquals;
+
/**
* Integration tests for Entity Jersey Resource.
*/
@@ -95,6 +97,7 @@ public class EntityJerseyResourceIT extends BaseResourceIT {
}
@Test
+ //API should accept single entity (or jsonarray of entities)
public void testSubmitSingleEntity() throws Exception {
Referenceable databaseInstance = new Referenceable(DATABASE_TYPE);
databaseInstance.set("name", randomString());
@@ -115,6 +118,34 @@ public class EntityJerseyResourceIT extends BaseResourceIT {
}
@Test
+ public void testEntityDeduping() throws Exception {
+ Referenceable db = new Referenceable(DATABASE_TYPE);
+ String dbName = "db" + randomString();
+ db.set("name", dbName);
+ db.set("description", randomString());
+
+ serviceClient.createEntity(db);
+ JSONArray results =
+ serviceClient.searchByDSL(String.format("%s where name='%s'", DATABASE_TYPE, dbName));
+ assertEquals(results.length(), 1);
+
+ //create entity again shouldn't create another instance with same unique attribute value
+ serviceClient.createEntity(db);
+ results = serviceClient.searchByDSL(String.format("%s where name='%s'", DATABASE_TYPE, dbName));
+ assertEquals(results.length(), 1);
+
+ //Test the same across references
+ Referenceable table = new Referenceable(HIVE_TABLE_TYPE);
+ final String tableName = randomString();
+ table.set("name", tableName);
+ table.set("db", db);
+
+ serviceClient.createEntity(table);
+ results = serviceClient.searchByDSL(String.format("%s where name='%s'", DATABASE_TYPE, dbName));
+ assertEquals(results.length(), 1);
+ }
+
+ @Test
public void testEntityDefinitionAcrossTypeUpdate() throws Exception {
//create type
HierarchicalTypeDefinition<ClassType> typeDefinition = TypesUtil