You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@atlas.apache.org by ma...@apache.org on 2017/11/12 18:14:45 UTC
[36/42] atlas git commit: ATLAS-2251: Remove TypeSystem and related
implementation, to avoid unncessary duplicate of type details in cache
http://git-wip-us.apache.org/repos/asf/atlas/blob/435fe3fb/notification/src/test/java/org/apache/atlas/notification/AbstractNotificationConsumerTest.java
----------------------------------------------------------------------
diff --git a/notification/src/test/java/org/apache/atlas/notification/AbstractNotificationConsumerTest.java b/notification/src/test/java/org/apache/atlas/notification/AbstractNotificationConsumerTest.java
index f313ddc..caa72ce 100644
--- a/notification/src/test/java/org/apache/atlas/notification/AbstractNotificationConsumerTest.java
+++ b/notification/src/test/java/org/apache/atlas/notification/AbstractNotificationConsumerTest.java
@@ -18,13 +18,15 @@
package org.apache.atlas.notification;
-import com.google.gson.Gson;
-import com.google.gson.reflect.TypeToken;
import org.apache.atlas.kafka.AtlasKafkaMessage;
+import org.apache.atlas.model.notification.AtlasNotificationMessage;
+import org.apache.atlas.type.AtlasType;
+import org.apache.atlas.model.notification.MessageVersion;
+import org.codehaus.jackson.type.TypeReference;
import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.testng.annotations.Test;
-import java.lang.reflect.Type;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
@@ -41,8 +43,6 @@ import org.apache.kafka.common.TopicPartition;
*/
public class AbstractNotificationConsumerTest {
- private static final Gson GSON = new Gson();
-
@Test
public void testReceive() throws Exception {
Logger logger = mock(Logger.class);
@@ -54,27 +54,24 @@ public class AbstractNotificationConsumerTest {
List jsonList = new LinkedList<>();
- jsonList.add(GSON.toJson(new AtlasNotificationMessage<>(new MessageVersion("1.0.0"), testMessage1)));
- jsonList.add(GSON.toJson(new AtlasNotificationMessage<>(new MessageVersion("1.0.0"), testMessage2)));
- jsonList.add(GSON.toJson(new AtlasNotificationMessage<>(new MessageVersion("1.0.0"), testMessage3)));
- jsonList.add(GSON.toJson(new AtlasNotificationMessage<>(new MessageVersion("1.0.0"), testMessage4)));
-
- Type notificationMessageType = new TypeToken<AtlasNotificationMessage<TestMessage>>(){}.getType();
+ jsonList.add(AtlasType.toV1Json(new AtlasNotificationMessage<>(new MessageVersion("1.0.0"), testMessage1)));
+ jsonList.add(AtlasType.toV1Json(new AtlasNotificationMessage<>(new MessageVersion("1.0.0"), testMessage2)));
+ jsonList.add(AtlasType.toV1Json(new AtlasNotificationMessage<>(new MessageVersion("1.0.0"), testMessage3)));
+ jsonList.add(AtlasType.toV1Json(new AtlasNotificationMessage<>(new MessageVersion("1.0.0"), testMessage4)));
- NotificationConsumer<TestMessage> consumer =
- new TestNotificationConsumer<>(notificationMessageType, jsonList, logger);
+ NotificationConsumer<TestMessage> consumer = new TestNotificationConsumer(jsonList, logger);
List<AtlasKafkaMessage<TestMessage>> messageList = consumer.receive();
assertFalse(messageList.isEmpty());
- assertEquals(testMessage1, messageList.get(0).getMessage());
+ assertEquals(messageList.get(0).getMessage(), testMessage1);
- assertEquals(testMessage2, messageList.get(1).getMessage());
+ assertEquals(messageList.get(1).getMessage(), testMessage2);
- assertEquals(testMessage3, messageList.get(2).getMessage());
+ assertEquals(messageList.get(2).getMessage(), testMessage3);
- assertEquals(testMessage4, messageList.get(3).getMessage());
+ assertEquals(messageList.get(3).getMessage(), testMessage4);
}
@Test
@@ -88,20 +85,17 @@ public class AbstractNotificationConsumerTest {
List jsonList = new LinkedList<>();
- String json1 = GSON.toJson(new AtlasNotificationMessage<>(new MessageVersion("1.0.0"), testMessage1));
- String json2 = GSON.toJson(new AtlasNotificationMessage<>(new MessageVersion("0.0.5"), testMessage2));
- String json3 = GSON.toJson(new AtlasNotificationMessage<>(new MessageVersion("0.5.0"), testMessage3));
- String json4 = GSON.toJson(testMessage4);
+ String json1 = AtlasType.toV1Json(new AtlasNotificationMessage<>(new MessageVersion("1.0.0"), testMessage1));
+ String json2 = AtlasType.toV1Json(new AtlasNotificationMessage<>(new MessageVersion("0.0.5"), testMessage2));
+ String json3 = AtlasType.toV1Json(new AtlasNotificationMessage<>(new MessageVersion("0.5.0"), testMessage3));
+ String json4 = AtlasType.toV1Json(testMessage4);
jsonList.add(json1);
jsonList.add(json2);
jsonList.add(json3);
jsonList.add(json4);
- Type notificationMessageType = new TypeToken<AtlasNotificationMessage<TestMessage>>(){}.getType();
-
- NotificationConsumer<TestMessage> consumer =
- new TestNotificationConsumer<>(notificationMessageType, jsonList, logger);
+ NotificationConsumer<TestMessage> consumer = new TestNotificationConsumer(jsonList, logger);
List<AtlasKafkaMessage<TestMessage>> messageList = consumer.receive();
@@ -124,16 +118,13 @@ public class AbstractNotificationConsumerTest {
List jsonList = new LinkedList<>();
- String json1 = GSON.toJson(new AtlasNotificationMessage<>(new MessageVersion("1.0.0"), testMessage1));
- String json2 = GSON.toJson(new AtlasNotificationMessage<>(new MessageVersion("2.0.0"), testMessage2));
+ String json1 = AtlasType.toV1Json(new AtlasNotificationMessage<>(new MessageVersion("1.0.0"), testMessage1));
+ String json2 = AtlasType.toV1Json(new AtlasNotificationMessage<>(new MessageVersion("2.0.0"), testMessage2));
jsonList.add(json1);
jsonList.add(json2);
- Type notificationMessageType = new TypeToken<AtlasNotificationMessage<TestMessage>>(){}.getType();
-
- NotificationConsumer<TestMessage> consumer =
- new TestNotificationConsumer<>(notificationMessageType, jsonList, logger);
+ NotificationConsumer<TestMessage> consumer = new TestNotificationConsumer(jsonList, logger);
try {
List<AtlasKafkaMessage<TestMessage>> messageList = consumer.receive();
@@ -150,7 +141,10 @@ public class AbstractNotificationConsumerTest {
private static class TestMessage {
private String s;
- private int i;
+ private int i;
+
+ public TestMessage() {
+ }
public TestMessage(String s, int i) {
this.s = s;
@@ -165,6 +159,14 @@ public class AbstractNotificationConsumerTest {
this.s = s;
}
+ public int getI() {
+ return i;
+ }
+
+ public void setI(int i) {
+ this.i = i;
+ }
+
@Override
public boolean equals(Object o) {
if (this == o) return true;
@@ -180,12 +182,14 @@ public class AbstractNotificationConsumerTest {
}
}
- private static class TestNotificationConsumer<T> extends AbstractNotificationConsumer<T> {
- private final List<T> messageList;
- private int index = 0;
+ private static class TestNotificationConsumer extends AbstractNotificationConsumer<TestMessage> {
+ private final List<TestMessage> messageList;
+ private int index = 0;
+
+
+ public TestNotificationConsumer(List<TestMessage> messages, Logger logger) {
+ super(new TestMessageDeserializer());
- public TestNotificationConsumer(Type notificationMessageType, List<T> messages, Logger logger) {
- super(new TestDeserializer<T>(notificationMessageType, logger));
this.messageList = messages;
}
@@ -205,24 +209,35 @@ public class AbstractNotificationConsumerTest {
}
@Override
- public List<AtlasKafkaMessage<T>> receive() {
+ public List<AtlasKafkaMessage<TestMessage>> receive() {
return receive(1000L);
}
@Override
- public List<AtlasKafkaMessage<T>> receive(long timeoutMilliSeconds) {
- List<AtlasKafkaMessage<T>> tempMessageList = new ArrayList();
+ public List<AtlasKafkaMessage<TestMessage>> receive(long timeoutMilliSeconds) {
+ List<AtlasKafkaMessage<TestMessage>> tempMessageList = new ArrayList();
for(Object json : messageList) {
- tempMessageList.add(new AtlasKafkaMessage(deserializer.deserialize((String)json), -1, -1));
+ tempMessageList.add(new AtlasKafkaMessage(deserializer.deserialize((String) json), -1, -1));
}
return tempMessageList;
}
}
- private static final class TestDeserializer<T> extends AtlasNotificationMessageDeserializer<T> {
+ public static class TestMessageDeserializer extends AbstractMessageDeserializer<TestMessage> {
+ /**
+ * Logger for hook notification messages.
+ */
+ private static final Logger NOTIFICATION_LOGGER = LoggerFactory.getLogger(TestMessageDeserializer.class);
+
+
+ // ----- Constructors ----------------------------------------------------
- private TestDeserializer(Type notificationMessageType, Logger logger) {
- super(notificationMessageType, AbstractNotification.CURRENT_MESSAGE_VERSION, GSON, logger);
+ /**
+ * Create a hook notification message deserializer.
+ */
+ public TestMessageDeserializer() {
+ super(new TypeReference<TestMessage>() {}, new TypeReference<AtlasNotificationMessage<TestMessage>>() {},
+ AbstractNotification.CURRENT_MESSAGE_VERSION, NOTIFICATION_LOGGER);
}
}
}
http://git-wip-us.apache.org/repos/asf/atlas/blob/435fe3fb/notification/src/test/java/org/apache/atlas/notification/AbstractNotificationTest.java
----------------------------------------------------------------------
diff --git a/notification/src/test/java/org/apache/atlas/notification/AbstractNotificationTest.java b/notification/src/test/java/org/apache/atlas/notification/AbstractNotificationTest.java
index 655252c..94cb70d 100644
--- a/notification/src/test/java/org/apache/atlas/notification/AbstractNotificationTest.java
+++ b/notification/src/test/java/org/apache/atlas/notification/AbstractNotificationTest.java
@@ -19,12 +19,14 @@
package org.apache.atlas.notification;
import org.apache.atlas.AtlasException;
-import org.apache.atlas.notification.hook.HookNotification;
+import org.apache.atlas.model.notification.HookNotification;
+import org.apache.atlas.model.notification.HookNotification.HookNotificationType;
+import org.apache.atlas.notification.NotificationInterface.NotificationType;
+import org.apache.atlas.type.AtlasType;
import org.apache.commons.configuration.Configuration;
-import org.testng.annotations.Test;
import java.util.ArrayList;
-import java.util.LinkedList;
+import java.util.Arrays;
import java.util.List;
import java.util.Map;
@@ -36,70 +38,64 @@ import static org.testng.Assert.*;
*/
public class AbstractNotificationTest {
- @Test
+ @org.testng.annotations.Test
public void testSend() throws Exception {
- Configuration configuration = mock(Configuration.class);
+ Configuration configuration = mock(Configuration.class);
+ TestNotification notification = new TestNotification(configuration);
+ Test message1 = new Test(HookNotificationType.ENTITY_CREATE, "user1");
+ Test message2 = new Test(HookNotificationType.TYPE_CREATE, "user1");
+ Test message3 = new Test(HookNotificationType.ENTITY_FULL_UPDATE, "user1");
+ List<String> messageJson = new ArrayList<>();
- TestNotification notification = new TestNotification(configuration);
-
- TestMessage message1 = new TestMessage(HookNotification.HookNotificationType.ENTITY_CREATE, "user1");
- TestMessage message2 = new TestMessage(HookNotification.HookNotificationType.TYPE_CREATE, "user1");
- TestMessage message3 = new TestMessage(HookNotification.HookNotificationType.ENTITY_FULL_UPDATE, "user1");
-
- List<String> messageJson = new ArrayList<>();
AbstractNotification.createNotificationMessages(message1, messageJson);
AbstractNotification.createNotificationMessages(message2, messageJson);
AbstractNotification.createNotificationMessages(message3, messageJson);
- notification.send(NotificationInterface.NotificationType.HOOK, message1, message2, message3);
+ notification.send(NotificationType.HOOK, message1, message2, message3);
- assertEquals(NotificationInterface.NotificationType.HOOK, notification.type);
+ assertEquals(NotificationType.HOOK, notification.type);
assertEquals(3, notification.messages.size());
+
for (int i = 0; i < notification.messages.size(); i++) {
assertEqualsMessageJson(notification.messages.get(i), messageJson.get(i));
}
}
- @Test
+ @org.testng.annotations.Test
public void testSend2() throws Exception {
- Configuration configuration = mock(Configuration.class);
-
- TestNotification notification = new TestNotification(configuration);
+ Configuration configuration = mock(Configuration.class);
+ TestNotification notification = new TestNotification(configuration);
+ Test message1 = new Test(HookNotificationType.ENTITY_CREATE, "user1");
+ Test message2 = new Test(HookNotificationType.TYPE_CREATE, "user1");
+ Test message3 = new Test(HookNotificationType.ENTITY_FULL_UPDATE, "user1");
+ List<Test> messages = Arrays.asList(message1, message2, message3);
+ List<String> messageJson = new ArrayList<>();
- TestMessage message1 = new TestMessage(HookNotification.HookNotificationType.ENTITY_CREATE, "user1");
- TestMessage message2 = new TestMessage(HookNotification.HookNotificationType.TYPE_CREATE, "user1");
- TestMessage message3 = new TestMessage(HookNotification.HookNotificationType.ENTITY_FULL_UPDATE, "user1");
-
- List<TestMessage> messages = new LinkedList<>();
- messages.add(message1);
- messages.add(message2);
- messages.add(message3);
-
- List<String> messageJson = new ArrayList<>();
AbstractNotification.createNotificationMessages(message1, messageJson);
AbstractNotification.createNotificationMessages(message2, messageJson);
AbstractNotification.createNotificationMessages(message3, messageJson);
notification.send(NotificationInterface.NotificationType.HOOK, messages);
- assertEquals(notification.type, NotificationInterface.NotificationType.HOOK);
+ assertEquals(notification.type, NotificationType.HOOK);
assertEquals(notification.messages.size(), messageJson.size());
+
for (int i = 0; i < notification.messages.size(); i++) {
assertEqualsMessageJson(notification.messages.get(i), messageJson.get(i));
}
}
- public static class TestMessage extends HookNotification.HookNotificationMessage {
+ public static class Test extends HookNotification {
- public TestMessage(HookNotification.HookNotificationType type, String user) {
+ public Test(HookNotificationType type, String user) {
super(type, user);
}
}
// ignore msgCreationTime in Json
private void assertEqualsMessageJson(String msgJsonActual, String msgJsonExpected) {
- Map<Object, Object> msgActual = AbstractNotification.GSON.fromJson(msgJsonActual, Map.class);
- Map<Object, Object> msgExpected = AbstractNotification.GSON.fromJson(msgJsonExpected, Map.class);
+ Map<Object, Object> msgActual = AtlasType.fromV1Json(msgJsonActual, Map.class);
+ Map<Object, Object> msgExpected = AtlasType.fromV1Json(msgJsonExpected, Map.class);
msgActual.remove("msgCreationTime");
msgExpected.remove("msgCreationTime");
@@ -119,7 +115,7 @@ public class AbstractNotificationTest {
protected void sendInternal(NotificationType notificationType, List<String> notificationMessages)
throws NotificationException {
- type = notificationType;
+ type = notificationType;
messages = notificationMessages;
}
http://git-wip-us.apache.org/repos/asf/atlas/blob/435fe3fb/notification/src/test/java/org/apache/atlas/notification/AtlasNotificationMessageTest.java
----------------------------------------------------------------------
diff --git a/notification/src/test/java/org/apache/atlas/notification/AtlasNotificationMessageTest.java b/notification/src/test/java/org/apache/atlas/notification/AtlasNotificationMessageTest.java
index 27b5034..91a195d 100644
--- a/notification/src/test/java/org/apache/atlas/notification/AtlasNotificationMessageTest.java
+++ b/notification/src/test/java/org/apache/atlas/notification/AtlasNotificationMessageTest.java
@@ -18,6 +18,8 @@
package org.apache.atlas.notification;
+import org.apache.atlas.model.notification.AtlasNotificationMessage;
+import org.apache.atlas.model.notification.MessageVersion;
import org.testng.annotations.Test;
import static org.testng.Assert.*;
http://git-wip-us.apache.org/repos/asf/atlas/blob/435fe3fb/notification/src/test/java/org/apache/atlas/notification/MessageVersionTest.java
----------------------------------------------------------------------
diff --git a/notification/src/test/java/org/apache/atlas/notification/MessageVersionTest.java b/notification/src/test/java/org/apache/atlas/notification/MessageVersionTest.java
index d1af4b0..d8b3b34 100644
--- a/notification/src/test/java/org/apache/atlas/notification/MessageVersionTest.java
+++ b/notification/src/test/java/org/apache/atlas/notification/MessageVersionTest.java
@@ -18,6 +18,7 @@
package org.apache.atlas.notification;
+import org.apache.atlas.model.notification.MessageVersion;
import org.testng.annotations.Test;
import java.util.Arrays;
http://git-wip-us.apache.org/repos/asf/atlas/blob/435fe3fb/notification/src/test/java/org/apache/atlas/notification/SplitMessageAggregatorTest.java
----------------------------------------------------------------------
diff --git a/notification/src/test/java/org/apache/atlas/notification/SplitMessageAggregatorTest.java b/notification/src/test/java/org/apache/atlas/notification/SplitMessageAggregatorTest.java
index 0807221..b79735a 100644
--- a/notification/src/test/java/org/apache/atlas/notification/SplitMessageAggregatorTest.java
+++ b/notification/src/test/java/org/apache/atlas/notification/SplitMessageAggregatorTest.java
@@ -17,7 +17,8 @@
*/
package org.apache.atlas.notification;
-import org.apache.atlas.notification.AtlasNotificationBaseMessage.CompressionKind;
+import org.apache.atlas.model.notification.AtlasNotificationBaseMessage.CompressionKind;
+import org.apache.atlas.model.notification.AtlasNotificationStringMessage;
import org.testng.Assert;
import org.testng.annotations.Test;
http://git-wip-us.apache.org/repos/asf/atlas/blob/435fe3fb/notification/src/test/java/org/apache/atlas/notification/entity/EntityMessageDeserializerTest.java
----------------------------------------------------------------------
diff --git a/notification/src/test/java/org/apache/atlas/notification/entity/EntityMessageDeserializerTest.java b/notification/src/test/java/org/apache/atlas/notification/entity/EntityMessageDeserializerTest.java
deleted file mode 100644
index 7b513da..0000000
--- a/notification/src/test/java/org/apache/atlas/notification/entity/EntityMessageDeserializerTest.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.atlas.notification.entity;
-
-import org.apache.atlas.notification.AbstractNotification;
-import org.apache.atlas.typesystem.IStruct;
-import org.apache.atlas.typesystem.Referenceable;
-import org.apache.atlas.typesystem.Struct;
-import org.testng.annotations.Test;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.LinkedList;
-import java.util.List;
-
-import static org.testng.Assert.assertEquals;
-
-/**
- * EntityMessageDeserializer tests.
- */
-public class EntityMessageDeserializerTest {
-
- @Test
- public void testDeserialize() throws Exception {
- EntityMessageDeserializer deserializer = new EntityMessageDeserializer();
-
- Referenceable entity = EntityNotificationImplTest.getEntity("id");
- String traitName = "MyTrait";
- List<IStruct> traitInfo = new LinkedList<>();
- IStruct trait = new Struct(traitName, Collections.<String, Object>emptyMap());
- traitInfo.add(trait);
-
- EntityNotificationImpl notification =
- new EntityNotificationImpl(entity, EntityNotification.OperationType.TRAIT_ADD, traitInfo);
-
- List<String> jsonMsgList = new ArrayList<>();
-
- AbstractNotification.createNotificationMessages(notification, jsonMsgList);
-
- EntityNotification deserializedNotification = null;
-
- for (String jsonMsg : jsonMsgList) {
- deserializedNotification = deserializer.deserialize(jsonMsg);
-
- if (deserializedNotification != null) {
- break;
- }
- }
-
- assertEquals(deserializedNotification.getOperationType(), notification.getOperationType());
- assertEquals(deserializedNotification.getEntity().getId(), notification.getEntity().getId());
- assertEquals(deserializedNotification.getEntity().getTypeName(), notification.getEntity().getTypeName());
- assertEquals(deserializedNotification.getEntity().getTraits(), notification.getEntity().getTraits());
- assertEquals(deserializedNotification.getEntity().getTrait(traitName),
- notification.getEntity().getTrait(traitName));
- }
-}
http://git-wip-us.apache.org/repos/asf/atlas/blob/435fe3fb/notification/src/test/java/org/apache/atlas/notification/entity/EntityNotificationDeserializerTest.java
----------------------------------------------------------------------
diff --git a/notification/src/test/java/org/apache/atlas/notification/entity/EntityNotificationDeserializerTest.java b/notification/src/test/java/org/apache/atlas/notification/entity/EntityNotificationDeserializerTest.java
new file mode 100644
index 0000000..13eafb6
--- /dev/null
+++ b/notification/src/test/java/org/apache/atlas/notification/entity/EntityNotificationDeserializerTest.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.notification.entity;
+
+import org.apache.atlas.model.notification.EntityNotification;
+import org.apache.atlas.v1.model.instance.Referenceable;
+import org.apache.atlas.v1.model.instance.Struct;
+import org.apache.atlas.notification.AbstractNotification;
+import org.apache.atlas.v1.model.notification.EntityNotificationV1;
+import org.testng.annotations.Test;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+
+/**
+ * EntityMessageDeserializer tests.
+ */
+public class EntityNotificationDeserializerTest {
+ private EntityMessageDeserializer deserializer = new EntityMessageDeserializer();
+
+ @Test
+ public void testDeserialize() throws Exception {
+ Referenceable entity = EntityNotificationTest.getEntity("id");
+ String traitName = "MyTrait";
+ List<Struct> traits = Collections.singletonList(new Struct(traitName, Collections.<String, Object>emptyMap()));
+ EntityNotificationV1 notification = new EntityNotificationV1(entity, EntityNotificationV1.OperationType.TRAIT_ADD, traits);
+ List<String> jsonMsgList = new ArrayList<>();
+
+ AbstractNotification.createNotificationMessages(notification, jsonMsgList);
+
+ EntityNotification deserializedNotification = null;
+
+ for (String jsonMsg : jsonMsgList) {
+ deserializedNotification = deserializer.deserialize(jsonMsg);
+
+ if (deserializedNotification != null) {
+ break;
+ }
+ }
+
+ assertTrue(deserializedNotification instanceof EntityNotificationV1);
+
+ EntityNotificationV1 entityNotificationV1 = (EntityNotificationV1)deserializedNotification;
+
+ assertEquals(entityNotificationV1.getOperationType(), notification.getOperationType());
+ assertEquals(entityNotificationV1.getEntity().getId(), notification.getEntity().getId());
+ assertEquals(entityNotificationV1.getEntity().getTypeName(), notification.getEntity().getTypeName());
+ assertEquals(entityNotificationV1.getEntity().getTraits(), notification.getEntity().getTraits());
+ assertEquals(entityNotificationV1.getEntity().getTrait(traitName), notification.getEntity().getTrait(traitName));
+ }
+}
http://git-wip-us.apache.org/repos/asf/atlas/blob/435fe3fb/notification/src/test/java/org/apache/atlas/notification/entity/EntityNotificationImplTest.java
----------------------------------------------------------------------
diff --git a/notification/src/test/java/org/apache/atlas/notification/entity/EntityNotificationImplTest.java b/notification/src/test/java/org/apache/atlas/notification/entity/EntityNotificationImplTest.java
deleted file mode 100644
index c3a2db8..0000000
--- a/notification/src/test/java/org/apache/atlas/notification/entity/EntityNotificationImplTest.java
+++ /dev/null
@@ -1,149 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.atlas.notification.entity;
-
-import org.apache.atlas.typesystem.IStruct;
-import org.apache.atlas.typesystem.Referenceable;
-import org.apache.atlas.typesystem.Struct;
-import org.apache.atlas.typesystem.types.TraitType;
-import org.apache.atlas.typesystem.types.TypeSystem;
-import org.testng.annotations.Test;
-
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertTrue;
-
-/**
- * EntityNotificationImpl tests.
- */
-public class EntityNotificationImplTest {
-
- @Test
- public void testGetEntity() throws Exception {
- Referenceable entity = getEntity("id");
-
- EntityNotificationImpl entityNotification =
- new EntityNotificationImpl(entity, EntityNotification.OperationType.ENTITY_CREATE,
- Collections.<IStruct>emptyList());
-
- assertEquals(entity, entityNotification.getEntity());
- }
-
- @Test
- public void testGetOperationType() throws Exception {
- Referenceable entity = getEntity("id");
-
- EntityNotificationImpl entityNotification =
- new EntityNotificationImpl(entity, EntityNotification.OperationType.ENTITY_CREATE,
- Collections.<IStruct>emptyList());
-
- assertEquals(EntityNotification.OperationType.ENTITY_CREATE, entityNotification.getOperationType());
- }
-
- @Test
- public void testGetAllTraits() throws Exception {
- Referenceable entity = getEntity("id");
- String traitName = "MyTrait";
- List<IStruct> traitInfo = new LinkedList<>();
- IStruct trait = new Struct(traitName, Collections.<String, Object>emptyMap());
- traitInfo.add(trait);
-
- EntityNotificationImpl entityNotification =
- new EntityNotificationImpl(entity, EntityNotification.OperationType.TRAIT_ADD, traitInfo);
-
- assertEquals(traitInfo, entityNotification.getAllTraits());
- }
-
- @Test
- public void testGetAllTraitsSuperTraits() throws Exception {
-
- TypeSystem typeSystem = mock(TypeSystem.class);
-
- String traitName = "MyTrait";
- IStruct myTrait = new Struct(traitName);
-
- String superTraitName = "MySuperTrait";
-
- TraitType traitDef = mock(TraitType.class);
- Set<String> superTypeNames = Collections.singleton(superTraitName);
-
- TraitType superTraitDef = mock(TraitType.class);
- Set<String> superSuperTypeNames = Collections.emptySet();
-
- Referenceable entity = getEntity("id", myTrait);
-
- when(typeSystem.getDataType(TraitType.class, traitName)).thenReturn(traitDef);
- when(typeSystem.getDataType(TraitType.class, superTraitName)).thenReturn(superTraitDef);
-
- when(traitDef.getAllSuperTypeNames()).thenReturn(superTypeNames);
- when(superTraitDef.getAllSuperTypeNames()).thenReturn(superSuperTypeNames);
-
- EntityNotificationImpl entityNotification =
- new EntityNotificationImpl(entity, EntityNotification.OperationType.TRAIT_ADD, typeSystem);
-
- List<IStruct> allTraits = entityNotification.getAllTraits();
-
- assertEquals(2, allTraits.size());
-
- for (IStruct trait : allTraits) {
- String typeName = trait.getTypeName();
- assertTrue(typeName.equals(traitName) || typeName.equals(superTraitName));
- }
- }
-
- @Test
- public void testEquals() throws Exception {
- Referenceable entity = getEntity("id");
-
- EntityNotificationImpl entityNotification2 =
- new EntityNotificationImpl(entity, EntityNotification.OperationType.ENTITY_CREATE,
- Collections.<IStruct>emptyList());
-
- EntityNotificationImpl entityNotification =
- new EntityNotificationImpl(entity, EntityNotification.OperationType.ENTITY_CREATE,
- Collections.<IStruct>emptyList());
-
- assertTrue(entityNotification.equals(entityNotification2));
- assertTrue(entityNotification2.equals(entityNotification));
- }
-
- public static Referenceable getEntity(String id, IStruct... traits) {
- String typeName = "typeName";
- Map<String, Object> values = new HashMap<>();
-
- List<String> traitNames = new LinkedList<>();
- Map<String, IStruct> traitMap = new HashMap<>();
-
- for (IStruct trait : traits) {
- String traitName = trait.getTypeName();
-
- traitNames.add(traitName);
- traitMap.put(traitName, trait);
- }
- return new Referenceable(id, typeName, values, traitNames, traitMap);
- }
-}
http://git-wip-us.apache.org/repos/asf/atlas/blob/435fe3fb/notification/src/test/java/org/apache/atlas/notification/entity/EntityNotificationTest.java
----------------------------------------------------------------------
diff --git a/notification/src/test/java/org/apache/atlas/notification/entity/EntityNotificationTest.java b/notification/src/test/java/org/apache/atlas/notification/entity/EntityNotificationTest.java
new file mode 100644
index 0000000..232b21d
--- /dev/null
+++ b/notification/src/test/java/org/apache/atlas/notification/entity/EntityNotificationTest.java
@@ -0,0 +1,128 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.notification.entity;
+
+import org.apache.atlas.v1.model.instance.Referenceable;
+import org.apache.atlas.v1.model.instance.Struct;
+import org.apache.atlas.type.AtlasClassificationType;
+import org.apache.atlas.type.AtlasTypeRegistry;
+import org.apache.atlas.v1.model.notification.EntityNotificationV1;
+import org.apache.atlas.v1.model.notification.EntityNotificationV1.OperationType;
+import org.testng.annotations.Test;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+
+/**
+ * EntityNotificationV1 tests.
+ */
+public class EntityNotificationTest {
+
+ @Test
+ public void testGetEntity() throws Exception {
+ Referenceable entity = getEntity("id");
+ EntityNotificationV1 entityNotification = new EntityNotificationV1(entity, OperationType.ENTITY_CREATE, Collections.<Struct>emptyList());
+
+ assertEquals(entity, entityNotification.getEntity());
+ }
+
+ @Test
+ public void testGetOperationType() throws Exception {
+ Referenceable entity = getEntity("id");
+ EntityNotificationV1 entityNotification = new EntityNotificationV1(entity, OperationType.ENTITY_CREATE, Collections.<Struct>emptyList());
+
+ assertEquals(EntityNotificationV1.OperationType.ENTITY_CREATE, entityNotification.getOperationType());
+ }
+
+ @Test
+ public void testGetAllTraits() throws Exception {
+ Referenceable entity = getEntity("id");
+ String traitName = "MyTrait";
+ List<Struct> traitInfo = Collections.singletonList(new Struct(traitName, Collections.<String, Object>emptyMap()));
+
+ EntityNotificationV1 entityNotification = new EntityNotificationV1(entity, OperationType.TRAIT_ADD, traitInfo);
+
+ assertEquals(traitInfo, entityNotification.getAllTraits());
+ }
+
+ @Test
+ public void testGetAllTraitsSuperTraits() throws Exception {
+ AtlasTypeRegistry typeRegistry = mock(AtlasTypeRegistry.class);
+ String traitName = "MyTrait";
+ Struct myTrait = new Struct(traitName);
+ String superTraitName = "MySuperTrait";
+ AtlasClassificationType traitType = mock(AtlasClassificationType.class);
+ Set<String> superTypeNames = Collections.singleton(superTraitName);
+ AtlasClassificationType superTraitType = mock(AtlasClassificationType.class);
+ Set<String> superSuperTypeNames = Collections.emptySet();
+ Referenceable entity = getEntity("id", myTrait);
+
+ when(typeRegistry.getClassificationTypeByName(traitName)).thenReturn(traitType);
+ when(typeRegistry.getClassificationTypeByName(superTraitName)).thenReturn(superTraitType);
+
+ when(traitType.getAllSuperTypes()).thenReturn(superTypeNames);
+ when(superTraitType.getAllSuperTypes()).thenReturn(superSuperTypeNames);
+
+ EntityNotificationV1 entityNotification = new EntityNotificationV1(entity, OperationType.TRAIT_ADD, typeRegistry);
+
+ List<Struct> allTraits = entityNotification.getAllTraits();
+
+ assertEquals(2, allTraits.size());
+
+ for (Struct trait : allTraits) {
+ String typeName = trait.getTypeName();
+
+ assertTrue(typeName.equals(traitName) || typeName.equals(superTraitName));
+ }
+ }
+
+ @Test
+ public void testEquals() throws Exception {
+ Referenceable entity = getEntity("id");
+ EntityNotificationV1 entityNotification2 = new EntityNotificationV1(entity, OperationType.ENTITY_CREATE, Collections.<Struct>emptyList());
+ EntityNotificationV1 entityNotification = new EntityNotificationV1(entity, OperationType.ENTITY_CREATE, Collections.<Struct>emptyList());
+
+ assertTrue(entityNotification.equals(entityNotification2));
+ assertTrue(entityNotification2.equals(entityNotification));
+ }
+
+ public static Referenceable getEntity(String id, Struct... traits) {
+ String typeName = "typeName";
+ List<String> traitNames = new LinkedList<>();
+ Map<String, Struct> traitMap = new HashMap<>();
+
+ for (Struct trait : traits) {
+ String traitName = trait.getTypeName();
+
+ traitNames.add(traitName);
+ traitMap.put(traitName, trait);
+ }
+
+ return new Referenceable(id, typeName, new HashMap<String, Object>(), traitNames, traitMap);
+ }
+}
http://git-wip-us.apache.org/repos/asf/atlas/blob/435fe3fb/notification/src/test/java/org/apache/atlas/notification/hook/HookMessageDeserializerTest.java
----------------------------------------------------------------------
diff --git a/notification/src/test/java/org/apache/atlas/notification/hook/HookMessageDeserializerTest.java b/notification/src/test/java/org/apache/atlas/notification/hook/HookMessageDeserializerTest.java
deleted file mode 100644
index 49b877b..0000000
--- a/notification/src/test/java/org/apache/atlas/notification/hook/HookMessageDeserializerTest.java
+++ /dev/null
@@ -1,170 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.atlas.notification.hook;
-
-import org.apache.atlas.notification.AbstractNotification;
-import org.apache.atlas.notification.entity.EntityNotificationImplTest;
-import org.apache.atlas.notification.hook.HookNotification.EntityUpdateRequest;
-import org.apache.atlas.notification.hook.HookNotification.HookNotificationMessage;
-import org.apache.atlas.typesystem.Referenceable;
-import org.apache.atlas.typesystem.Struct;
-import org.apache.commons.lang3.RandomStringUtils;
-import org.testng.annotations.Test;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-
-import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertNotNull;
-import static org.testng.Assert.assertTrue;
-
-/**
- * HookMessageDeserializer tests.
- */
-public class HookMessageDeserializerTest {
- HookMessageDeserializer deserializer = new HookMessageDeserializer();
-
- @Test
- public void testDeserialize() throws Exception {
- Referenceable entity = generateEntityWithTrait();
- EntityUpdateRequest message = new EntityUpdateRequest("user1", entity);
-
- List<String> jsonMsgList = new ArrayList<>();
-
- AbstractNotification.createNotificationMessages(message, jsonMsgList);
-
- HookNotificationMessage deserializedMessage = deserialize(jsonMsgList);
-
- assertEqualMessage(deserializedMessage, message);
- }
-
- // validate deserialization of legacy message, which doesn't use MessageVersion
- @Test
- public void testDeserializeLegacyMessage() throws Exception {
- Referenceable entity = generateEntityWithTrait();
- EntityUpdateRequest message = new EntityUpdateRequest("user1", entity);
-
- String jsonMsg = AbstractNotification.GSON.toJson(message);
- HookNotificationMessage deserializedMessage = deserializer.deserialize(jsonMsg);
-
- assertEqualMessage(deserializedMessage, message);
- }
-
- @Test
- public void testDeserializeCompressedMessage() throws Exception {
- Referenceable entity = generateLargeEntityWithTrait();
- EntityUpdateRequest message = new EntityUpdateRequest("user1", entity);
-
- List<String> jsonMsgList = new ArrayList<>();
-
- AbstractNotification.createNotificationMessages(message, jsonMsgList);
-
- assertTrue(jsonMsgList.size() == 1);
-
- String compressedMsg = jsonMsgList.get(0);
- String uncompressedMsg = AbstractNotification.GSON.toJson(message);
-
- assertTrue(compressedMsg.length() < uncompressedMsg.length(), "Compressed message (" + compressedMsg.length() + ") should be shorter than uncompressed message (" + uncompressedMsg.length() + ")");
-
- HookNotificationMessage deserializedMessage = deserialize(jsonMsgList);
-
- assertEqualMessage(deserializedMessage, message);
- }
-
- @Test
- public void testDeserializeSplitMessage() throws Exception {
- Referenceable entity = generateVeryLargeEntityWithTrait();
- EntityUpdateRequest message = new EntityUpdateRequest("user1", entity);
-
- List<String> jsonMsgList = new ArrayList<>();
-
- AbstractNotification.createNotificationMessages(message, jsonMsgList);
-
- assertTrue(jsonMsgList.size() > 1);
-
- HookNotificationMessage deserializedMessage = deserialize(jsonMsgList);
-
- assertEqualMessage(deserializedMessage, message);
- }
-
- private Referenceable generateEntityWithTrait() {
- Referenceable ret = EntityNotificationImplTest.getEntity("id", new Struct("MyTrait", Collections.<String, Object>emptyMap()));
-
- return ret;
- }
-
- private HookNotificationMessage deserialize(List<String> jsonMsgList) {
- HookNotificationMessage deserializedMessage = null;
-
- for (String jsonMsg : jsonMsgList) {
- deserializedMessage = deserializer.deserialize(jsonMsg);
-
- if (deserializedMessage != null) {
- break;
- }
- }
-
- return deserializedMessage;
- }
-
- private void assertEqualMessage(HookNotificationMessage deserializedMessage, EntityUpdateRequest message) throws Exception {
- assertNotNull(deserializedMessage);
- assertEquals(deserializedMessage.getType(), message.getType());
- assertEquals(deserializedMessage.getUser(), message.getUser());
-
- assertTrue(deserializedMessage instanceof EntityUpdateRequest);
-
- EntityUpdateRequest deserializedEntityUpdateRequest = (EntityUpdateRequest) deserializedMessage;
- Referenceable deserializedEntity = deserializedEntityUpdateRequest.getEntities().get(0);
- Referenceable entity = message.getEntities().get(0);
- String traitName = entity.getTraits().get(0);
-
- assertEquals(deserializedEntity.getId(), entity.getId());
- assertEquals(deserializedEntity.getTypeName(), entity.getTypeName());
- assertEquals(deserializedEntity.getTraits(), entity.getTraits());
- assertEquals(deserializedEntity.getTrait(traitName).hashCode(), entity.getTrait(traitName).hashCode());
-
- }
-
- private Referenceable generateLargeEntityWithTrait() {
- Referenceable ret = EntityNotificationImplTest.getEntity("id", new Struct("MyTrait", Collections.<String, Object>emptyMap()));
-
- // add 100 attributes, each with value of size 10k
- // Json Size=1,027,984; GZipped Size=16,387 ==> will compress, but not split
- String attrValue = RandomStringUtils.randomAlphanumeric(10 * 1024); // use the same value for all attributes - to aid better compression
- for (int i = 0; i < 100; i++) {
- ret.set("attr_" + i, attrValue);
- }
-
- return ret;
- }
-
- private Referenceable generateVeryLargeEntityWithTrait() {
- Referenceable ret = EntityNotificationImplTest.getEntity("id", new Struct("MyTrait", Collections.<String, Object>emptyMap()));
-
- // add 300 attributes, each with value of size 10k
- // Json Size=3,082,384; GZipped Size=2,313,357 ==> will compress & split
- for (int i = 0; i < 300; i++) {
- ret.set("attr_" + i, RandomStringUtils.randomAlphanumeric(10 * 1024));
- }
-
- return ret;
- }
-}
http://git-wip-us.apache.org/repos/asf/atlas/blob/435fe3fb/notification/src/test/java/org/apache/atlas/notification/hook/HookNotificationDeserializerTest.java
----------------------------------------------------------------------
diff --git a/notification/src/test/java/org/apache/atlas/notification/hook/HookNotificationDeserializerTest.java b/notification/src/test/java/org/apache/atlas/notification/hook/HookNotificationDeserializerTest.java
new file mode 100644
index 0000000..d048170
--- /dev/null
+++ b/notification/src/test/java/org/apache/atlas/notification/hook/HookNotificationDeserializerTest.java
@@ -0,0 +1,167 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.notification.hook;
+
+import org.apache.atlas.model.notification.HookNotification;
+import org.apache.atlas.notification.entity.EntityNotificationTest;
+import org.apache.atlas.v1.model.instance.Referenceable;
+import org.apache.atlas.v1.model.instance.Struct;
+import org.apache.atlas.notification.AbstractNotification;
+import org.apache.atlas.v1.model.notification.HookNotificationV1.EntityUpdateRequest;
+import org.apache.atlas.type.AtlasType;
+import org.apache.commons.lang3.RandomStringUtils;
+import org.testng.annotations.Test;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
+
+/**
+ * HookMessageDeserializer tests.
+ */
+public class HookNotificationDeserializerTest {
+ private HookMessageDeserializer deserializer = new HookMessageDeserializer();
+
+ @Test
+ public void testDeserialize() throws Exception {
+ Referenceable entity = generateEntityWithTrait();
+ EntityUpdateRequest message = new EntityUpdateRequest("user1", entity);
+ List<String> jsonMsgList = new ArrayList<>();
+
+ AbstractNotification.createNotificationMessages(message, jsonMsgList);
+
+ HookNotification deserializedMessage = deserialize(jsonMsgList);
+
+ assertEqualMessage(deserializedMessage, message);
+ }
+
+ // validate deserialization of legacy message, which doesn't use MessageVersion
+ @Test
+ public void testDeserializeLegacyMessage() throws Exception {
+ Referenceable entity = generateEntityWithTrait();
+ EntityUpdateRequest message = new EntityUpdateRequest("user1", entity);
+ String jsonMsg = AtlasType.toV1Json(message);
+ HookNotification deserializedMessage = deserialize(Collections.singletonList(jsonMsg));
+
+ assertEqualMessage(deserializedMessage, message);
+ }
+
+ @Test
+ public void testDeserializeCompressedMessage() throws Exception {
+ Referenceable entity = generateLargeEntityWithTrait();
+ EntityUpdateRequest message = new EntityUpdateRequest("user1", entity);
+ List<String> jsonMsgList = new ArrayList<>();
+
+ AbstractNotification.createNotificationMessages(message, jsonMsgList);
+
+ assertTrue(jsonMsgList.size() == 1);
+
+ String compressedMsg = jsonMsgList.get(0);
+ String uncompressedMsg = AtlasType.toV1Json(message);
+
+ assertTrue(compressedMsg.length() < uncompressedMsg.length(), "Compressed message (" + compressedMsg.length() + ") should be shorter than uncompressed message (" + uncompressedMsg.length() + ")");
+
+ HookNotification deserializedMessage = deserialize(jsonMsgList);
+
+ assertEqualMessage(deserializedMessage, message);
+ }
+
+ @Test
+ public void testDeserializeSplitMessage() throws Exception {
+ Referenceable entity = generateVeryLargeEntityWithTrait();
+ EntityUpdateRequest message = new EntityUpdateRequest("user1", entity);
+ List<String> jsonMsgList = new ArrayList<>();
+
+ AbstractNotification.createNotificationMessages(message, jsonMsgList);
+
+ assertTrue(jsonMsgList.size() > 1);
+
+ HookNotification deserializedMessage = deserialize(jsonMsgList);
+
+ assertEqualMessage(deserializedMessage, message);
+ }
+
+ private Referenceable generateEntityWithTrait() {
+ Referenceable ret = EntityNotificationTest.getEntity("id", new Struct("MyTrait", Collections.<String, Object>emptyMap()));
+
+ return ret;
+ }
+
+ private HookNotification deserialize(List<String> jsonMsgList) {
+ HookNotification deserializedMessage = null;
+
+ for (String jsonMsg : jsonMsgList) {
+ deserializedMessage = deserializer.deserialize(jsonMsg);
+
+ if (deserializedMessage != null) {
+ break;
+ }
+ }
+
+ return deserializedMessage;
+ }
+
+ private void assertEqualMessage(HookNotification deserializedMessage, EntityUpdateRequest message) throws Exception {
+ assertNotNull(deserializedMessage);
+ assertEquals(deserializedMessage.getType(), message.getType());
+ assertEquals(deserializedMessage.getUser(), message.getUser());
+
+ assertTrue(deserializedMessage instanceof EntityUpdateRequest);
+
+ EntityUpdateRequest deserializedEntityUpdateRequest = (EntityUpdateRequest) deserializedMessage;
+ Referenceable deserializedEntity = deserializedEntityUpdateRequest.getEntities().get(0);
+ Referenceable entity = message.getEntities().get(0);
+ String traitName = entity.getTraitNames().get(0);
+
+ assertEquals(deserializedEntity.getId(), entity.getId());
+ assertEquals(deserializedEntity.getTypeName(), entity.getTypeName());
+ assertEquals(deserializedEntity.getTraits(), entity.getTraits());
+ assertEquals(deserializedEntity.getTrait(traitName).hashCode(), entity.getTrait(traitName).hashCode());
+
+ }
+
+ private Referenceable generateLargeEntityWithTrait() {
+ Referenceable ret = EntityNotificationTest.getEntity("id", new Struct("MyTrait", Collections.<String, Object>emptyMap()));
+
+ // add 100 attributes, each with value of size 10k
+ // Json Size=1,027,984; GZipped Size=16,387 ==> will compress, but not split
+ String attrValue = RandomStringUtils.randomAlphanumeric(10 * 1024); // use the same value for all attributes - to aid better compression
+ for (int i = 0; i < 100; i++) {
+ ret.set("attr_" + i, attrValue);
+ }
+
+ return ret;
+ }
+
+ private Referenceable generateVeryLargeEntityWithTrait() {
+ Referenceable ret = EntityNotificationTest.getEntity("id", new Struct("MyTrait", Collections.<String, Object>emptyMap()));
+
+ // add 300 attributes, each with value of size 10k
+ // Json Size=3,082,384; GZipped Size=2,313,357 ==> will compress & split
+ for (int i = 0; i < 300; i++) {
+ ret.set("attr_" + i, RandomStringUtils.randomAlphanumeric(10 * 1024));
+ }
+
+ return ret;
+ }
+}
http://git-wip-us.apache.org/repos/asf/atlas/blob/435fe3fb/notification/src/test/java/org/apache/atlas/notification/hook/HookNotificationTest.java
----------------------------------------------------------------------
diff --git a/notification/src/test/java/org/apache/atlas/notification/hook/HookNotificationTest.java b/notification/src/test/java/org/apache/atlas/notification/hook/HookNotificationTest.java
index dd3257e..cf691af 100644
--- a/notification/src/test/java/org/apache/atlas/notification/hook/HookNotificationTest.java
+++ b/notification/src/test/java/org/apache/atlas/notification/hook/HookNotificationTest.java
@@ -17,16 +17,19 @@
*/
package org.apache.atlas.notification.hook;
-import org.apache.atlas.notification.AbstractNotification;
-import org.apache.atlas.typesystem.Referenceable;
+import org.apache.atlas.model.notification.HookNotification;
+import org.apache.atlas.model.notification.HookNotification.HookNotificationType;
+import org.apache.atlas.v1.model.instance.Referenceable;
+import org.apache.atlas.type.AtlasType;
+import org.apache.atlas.v1.model.notification.HookNotificationV1.EntityCreateRequest;
import org.testng.annotations.Test;
import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertNull;
+import static org.testng.Assert.assertTrue;
-public class HookNotificationTest {
- public static final HookMessageDeserializer HOOK_MESSAGE_DESERIALIZER = new HookMessageDeserializer();
+public class HookNotificationTest {
+ private HookMessageDeserializer deserializer = new HookMessageDeserializer();
@Test
public void testNewMessageSerDe() throws Exception {
@@ -35,19 +38,21 @@ public class HookNotificationTest {
entity1.set("complex", new Referenceable("othertype"));
Referenceable entity2 = new Referenceable("newtype");
String user = "user";
- HookNotification.EntityCreateRequest request = new HookNotification.EntityCreateRequest(user, entity1, entity2);
- String notificationJson = AbstractNotification.GSON.toJson(request);
- HookNotification.HookNotificationMessage actualNotification =
- HOOK_MESSAGE_DESERIALIZER.deserialize(notificationJson);
+ EntityCreateRequest request = new EntityCreateRequest(user, entity1, entity2);
+ String notificationJson = AtlasType.toV1Json(request);
+ HookNotification actualNotification = deserializer.deserialize(notificationJson);
- assertEquals(actualNotification.getType(), HookNotification.HookNotificationType.ENTITY_CREATE);
+ assertEquals(actualNotification.getType(), HookNotificationType.ENTITY_CREATE);
assertEquals(actualNotification.getUser(), user);
+ assertTrue(actualNotification instanceof EntityCreateRequest);
+
+ EntityCreateRequest createRequest = (EntityCreateRequest) actualNotification;
- HookNotification.EntityCreateRequest createRequest = (HookNotification.EntityCreateRequest) actualNotification;
assertEquals(createRequest.getEntities().size(), 2);
Referenceable actualEntity1 = createRequest.getEntities().get(0);
+
assertEquals(actualEntity1.getTypeName(), "sometype");
assertEquals(((Referenceable)actualEntity1.get("complex")).getTypeName(), "othertype");
assertEquals(createRequest.getEntities().get(1).getTypeName(), "newtype");
@@ -58,9 +63,10 @@ public class HookNotificationTest {
//Code to generate the json, use it for hard-coded json used later in this test
Referenceable entity = new Referenceable("sometype");
entity.set("attr", "value");
- HookNotification.EntityCreateRequest request = new HookNotification.EntityCreateRequest(null, entity);
- String notificationJsonFromCode = AbstractNotification.GSON.toJson(request);
+ EntityCreateRequest request = new EntityCreateRequest(null, entity);
+ String notificationJsonFromCode = AtlasType.toV1Json(request);
+
System.out.println(notificationJsonFromCode);
//Json without user and assert that the string can be deserialised
@@ -87,11 +93,9 @@ public class HookNotificationTest {
+ "}";
- HookNotification.HookNotificationMessage actualNotification =
- HOOK_MESSAGE_DESERIALIZER.deserialize(notificationJson);
+ HookNotification actualNotification = deserializer.deserialize(notificationJson);
- assertEquals(actualNotification.getType(), HookNotification.HookNotificationType.ENTITY_CREATE);
- assertNull(actualNotification.user);
- assertEquals(actualNotification.getUser(), HookNotification.HookNotificationMessage.UNKNOW_USER);
+ assertEquals(actualNotification.getType(), HookNotificationType.ENTITY_CREATE);
+ assertEquals(actualNotification.getUser(), HookNotification.UNKNOW_USER);
}
}
http://git-wip-us.apache.org/repos/asf/atlas/blob/435fe3fb/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 3c48b86..cbf9e2a 100644
--- a/pom.xml
+++ b/pom.xml
@@ -535,13 +535,10 @@
<hbase.version>1.1.2</hbase.version>
<solr.version>5.5.1</solr.version>
<kafka.version>0.10.0.0</kafka.version>
+ <kafka.scala.binary.version>2.11</kafka.scala.binary.version>
<curator.version>2.11.0</curator.version>
<zookeeper.version>3.4.6</zookeeper.version>
- <!-- scala versions -->
- <scala.version>2.11.8</scala.version>
- <scala.binary.version>2.11</scala.binary.version>
- <scala.macros.version>2.0.1</scala.macros.version>
<json.version>3.2.11</json.version>
<log4j.version>1.2.17</log4j.version>
<akka.version>2.3.7</akka.version>
@@ -582,7 +579,7 @@
<jetty-maven-plugin.stopWait>10</jetty-maven-plugin.stopWait>
<!-- The following graph.{storage|index}.backend definitions dictate the backends for test
- resources in typesystem and each of the graph profiles. They do not affect packaging
+ resources in intg and each of the graph profiles. They do not affect packaging
which is handled by defaults and profiles set in atlas-distro POM -->
<graph.storage.backend>berkeleyje</graph.storage.backend>
<graph.index.backend>elasticsearch</graph.index.backend>
@@ -725,7 +722,6 @@
<module>build-tools</module>
<module>intg</module>
<module>common</module>
- <module>typesystem</module>
<module>server-api</module>
<module>notification</module>
<module>client</module>
@@ -748,7 +744,7 @@
<module>addons/sqoop-bridge</module>
<module>addons/storm-bridge-shim</module>
<module>addons/storm-bridge</module>
- <module>addons/hbase-bridge-shim</module>
+ <module>addons/hbase-bridge-shim</module>
<module>addons/hbase-bridge</module>
<module>distro</module>
</modules>
@@ -804,7 +800,6 @@
<dependencyManagement>
<dependencies>
-
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
@@ -1336,20 +1331,6 @@
<dependency>
<groupId>org.apache.atlas</groupId>
- <artifactId>atlas-typesystem</artifactId>
- <version>${project.version}</version>
- </dependency>
-
- <dependency>
- <groupId>org.apache.atlas</groupId>
- <artifactId>atlas-typesystem</artifactId>
- <version>${project.version}</version>
- <classifier>tests</classifier>
- <scope>test</scope>
- </dependency>
-
- <dependency>
- <groupId>org.apache.atlas</groupId>
<artifactId>atlas-graphdb-api</artifactId>
<version>${project.version}</version>
</dependency>
@@ -1531,109 +1512,6 @@
</dependency>
- <!--Scala dependencies-->
- <dependency>
- <groupId>org.scala-lang</groupId>
- <artifactId>scala-compiler</artifactId>
- <version>${scala.version}</version>
- </dependency>
-
- <dependency>
- <groupId>org.scala-lang</groupId>
- <artifactId>scala-reflect</artifactId>
- <version>${scala.version}</version>
- </dependency>
-
- <dependency>
- <groupId>org.scala-lang</groupId>
- <artifactId>scala-library</artifactId>
- <version>${scala.version}</version>
- </dependency>
-
- <dependency>
- <groupId>org.scala-lang</groupId>
- <artifactId>scala-actors</artifactId>
- <version>${scala.version}</version>
- </dependency>
-
- <dependency>
- <groupId>org.scala-lang</groupId>
- <artifactId>scalap</artifactId>
- <version>${scala.version}</version>
- </dependency>
-
- <dependency>
- <groupId>org.json4s</groupId>
- <artifactId>json4s-native_${scala.binary.version}</artifactId>
- <version>${json.version}</version>
- </dependency>
-
- <dependency>
- <groupId>com.github.nscala-time</groupId>
- <artifactId>nscala-time_${scala.binary.version}</artifactId>
- <version>1.6.0</version>
- </dependency>
-
- <dependency>
- <groupId>com.typesafe</groupId>
- <artifactId>config</artifactId>
- <version>1.2.1</version>
- </dependency>
-
- <dependency>
- <groupId>com.typesafe.akka</groupId>
- <artifactId>akka-actor_${scala.binary.version}</artifactId>
- <version>${akka.version}</version>
- </dependency>
-
- <dependency>
- <groupId>com.typesafe.akka</groupId>
- <artifactId>akka-testkit_${scala.binary.version}</artifactId>
- <version>${akka.version}</version>
- </dependency>
-
- <dependency>
- <groupId>com.typesafe.akka</groupId>
- <artifactId>akka-slf4j_${scala.binary.version}</artifactId>
- <version>${akka.version}</version>
- </dependency>
-
- <dependency>
- <groupId>io.spray</groupId>
- <artifactId>spray-routing</artifactId>
- <version>${spray.version}</version>
- </dependency>
-
- <dependency>
- <groupId>io.spray</groupId>
- <artifactId>spray-can</artifactId>
- <version>${spray.version}</version>
- </dependency>
-
- <dependency>
- <groupId>io.spray</groupId>
- <artifactId>spray-httpx</artifactId>
- <version>${spray.version}</version>
- </dependency>
-
- <dependency>
- <groupId>io.spray</groupId>
- <artifactId>spray-testkit</artifactId>
- <version>${spray.version}</version>
- </dependency>
-
- <dependency>
- <groupId>com.google.code.gson</groupId>
- <artifactId>gson</artifactId>
- <version>${gson.version}</version>
- </dependency>
-
- <dependency>
- <groupId>it.unimi.dsi</groupId>
- <artifactId>fastutil</artifactId>
- <version>${fastutil.version}</version>
- </dependency>
-
<!-- API documentation -->
<dependency>
<groupId>com.webcohesion.enunciate</groupId>
@@ -1704,7 +1582,7 @@
<dependency>
<groupId>org.apache.kafka</groupId>
- <artifactId>kafka_${scala.binary.version}</artifactId>
+ <artifactId>kafka_${kafka.scala.binary.version}</artifactId>
<version>${kafka.version}</version>
<exclusions>
<exclusion>
@@ -1778,10 +1656,6 @@
<directory>src/test/resources</directory>
<filtering>true</filtering>
</testResource>
- <testResource>
- <directory>typesystem/src/test/resources</directory>
- <filtering>true</filtering>
- </testResource>
</testResources>
<pluginManagement>
@@ -1796,6 +1670,10 @@
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.7.0</version>
+ <configuration>
+ <source>1.8</source>
+ <target>1.8</target>
+ </configuration>
</plugin>
<plugin>
@@ -1927,16 +1805,6 @@
</execution>
</executions>
</plugin>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-compiler-plugin</artifactId>
- <version>3.7.0</version>
- <configuration>
- <source>1.7</source>
- <target>1.7</target>
- <optimize>true</optimize>
- </configuration>
- </plugin>
<plugin>
<groupId>org.apache.felix</groupId>
@@ -2028,12 +1896,6 @@
</plugin>
<plugin>
- <groupId>net.alchim31.maven</groupId>
- <artifactId>scala-maven-plugin</artifactId>
- <version>3.2.0</version>
- </plugin>
-
- <plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>buildnumber-maven-plugin</artifactId>
<executions>
http://git-wip-us.apache.org/repos/asf/atlas/blob/435fe3fb/repository/pom.xml
----------------------------------------------------------------------
diff --git a/repository/pom.xml b/repository/pom.xml
index f981c76..51ddb76 100755
--- a/repository/pom.xml
+++ b/repository/pom.xml
@@ -32,7 +32,6 @@
<packaging>jar</packaging>
<dependencies>
-
<dependency>
<groupId>org.apache.atlas</groupId>
<artifactId>atlas-intg</artifactId>
@@ -40,11 +39,6 @@
<dependency>
<groupId>org.apache.atlas</groupId>
- <artifactId>atlas-typesystem</artifactId>
- </dependency>
-
- <dependency>
- <groupId>org.apache.atlas</groupId>
<artifactId>atlas-server-api</artifactId>
</dependency>
@@ -58,6 +52,11 @@
<artifactId>atlas-graphdb-api</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-lang3</artifactId>
+ </dependency>
+
<dependency>
<groupId>joda-time</groupId>
<artifactId>joda-time</artifactId>
@@ -69,36 +68,6 @@
</dependency>
<dependency>
- <groupId>com.googlecode.json-simple</groupId>
- <artifactId>json-simple</artifactId>
- </dependency>
-
- <dependency>
- <groupId>org.scala-lang</groupId>
- <artifactId>scala-reflect</artifactId>
- </dependency>
-
- <dependency>
- <groupId>org.scala-lang</groupId>
- <artifactId>scala-library</artifactId>
- </dependency>
-
- <dependency>
- <groupId>org.scala-lang</groupId>
- <artifactId>scala-actors</artifactId>
- </dependency>
-
- <dependency>
- <groupId>org.scala-lang</groupId>
- <artifactId>scalap</artifactId>
- </dependency>
-
- <dependency>
- <groupId>org.skyscreamer</groupId>
- <artifactId>jsonassert</artifactId>
- </dependency>
-
- <dependency>
<groupId>org.testng</groupId>
<artifactId>testng</artifactId>
</dependency>
@@ -108,13 +77,6 @@
<artifactId>mockito-all</artifactId>
</dependency>
- <dependency>
- <groupId>org.apache.atlas</groupId>
- <artifactId>atlas-typesystem</artifactId>
- <classifier>tests</classifier>
- <scope>test</scope>
- </dependency>
-
<!-- Test dependencies -->
<dependency>
@@ -173,43 +135,6 @@
<build>
<plugins>
<plugin>
- <groupId>net.alchim31.maven</groupId>
- <artifactId>scala-maven-plugin</artifactId>
- <version>3.2.0</version>
- <executions>
- <execution>
- <id>scala-compile-first</id>
- <phase>process-resources</phase>
- <goals>
- <goal>compile</goal>
- </goals>
- </execution>
- <execution>
- <id>scala-test-compile-first</id>
- <phase>process-test-resources</phase>
- <goals>
- <goal>testCompile</goal>
- </goals>
- </execution>
- </executions>
- <configuration>
- <scalaVersion>${scala.version}</scalaVersion>
- <recompileMode>incremental</recompileMode>
- <useZincServer>true</useZincServer>
- <source>1.7</source>
- <target>1.7</target>
- <args>
- <arg>-unchecked</arg>
- <arg>-deprecation</arg>
- <arg>-feature</arg>
- </args>
- <jvmArgs>
- <jvmArg>-Xmx512m</jvmArg>
- </jvmArgs>
- </configuration>
- </plugin>
-
- <plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<version>2.4</version>
http://git-wip-us.apache.org/repos/asf/atlas/blob/435fe3fb/repository/src/main/java/org/apache/atlas/GraphTransactionInterceptor.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/org/apache/atlas/GraphTransactionInterceptor.java b/repository/src/main/java/org/apache/atlas/GraphTransactionInterceptor.java
index c6a4bbe..b3e690f 100644
--- a/repository/src/main/java/org/apache/atlas/GraphTransactionInterceptor.java
+++ b/repository/src/main/java/org/apache/atlas/GraphTransactionInterceptor.java
@@ -22,7 +22,7 @@ import org.aopalliance.intercept.MethodInterceptor;
import org.aopalliance.intercept.MethodInvocation;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.repository.graphdb.AtlasGraph;
-import org.apache.atlas.typesystem.exception.NotFoundException;
+import org.apache.atlas.exception.NotFoundException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
http://git-wip-us.apache.org/repos/asf/atlas/blob/435fe3fb/repository/src/main/java/org/apache/atlas/discovery/AtlasLineageService.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/org/apache/atlas/discovery/AtlasLineageService.java b/repository/src/main/java/org/apache/atlas/discovery/AtlasLineageService.java
index 5f49625..8dc6d3a 100644
--- a/repository/src/main/java/org/apache/atlas/discovery/AtlasLineageService.java
+++ b/repository/src/main/java/org/apache/atlas/discovery/AtlasLineageService.java
@@ -22,6 +22,7 @@ package org.apache.atlas.discovery;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.lineage.AtlasLineageInfo;
import org.apache.atlas.model.lineage.AtlasLineageInfo.LineageDirection;
+import org.apache.atlas.v1.model.lineage.SchemaResponse.SchemaDetails;
public interface AtlasLineageService {
/**
@@ -32,4 +33,19 @@ public interface AtlasLineageService {
*/
AtlasLineageInfo getAtlasLineageInfo(String entityGuid, LineageDirection direction, int depth) throws AtlasBaseException;
+ /**
+ * Return the schema for the given datasetName.
+ *
+ * @param datasetName datasetName
+ * @return Schema as JSON
+ */
+ SchemaDetails getSchemaForHiveTableByName(String datasetName) throws AtlasBaseException;
+
+ /**
+ * Return the schema for the given entity id.
+ *
+ * @param guid tableName
+ * @return Schema as JSON
+ */
+ SchemaDetails getSchemaForHiveTableByGuid(String guid) throws AtlasBaseException;
}
http://git-wip-us.apache.org/repos/asf/atlas/blob/435fe3fb/repository/src/main/java/org/apache/atlas/discovery/DataSetLineageService.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/org/apache/atlas/discovery/DataSetLineageService.java b/repository/src/main/java/org/apache/atlas/discovery/DataSetLineageService.java
deleted file mode 100644
index af7f1b4..0000000
--- a/repository/src/main/java/org/apache/atlas/discovery/DataSetLineageService.java
+++ /dev/null
@@ -1,233 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.atlas.discovery;
-
-import org.apache.atlas.ApplicationProperties;
-import org.apache.atlas.AtlasClient;
-import org.apache.atlas.AtlasConfiguration;
-import org.apache.atlas.AtlasException;
-import org.apache.atlas.annotation.GraphTransaction;
-import org.apache.atlas.discovery.graph.DefaultGraphPersistenceStrategy;
-import org.apache.atlas.discovery.graph.GraphBackedDiscoveryService;
-import org.apache.atlas.query.GremlinQueryResult;
-import org.apache.atlas.query.InputLineageClosureQuery;
-import org.apache.atlas.query.OutputLineageClosureQuery;
-import org.apache.atlas.query.QueryParams;
-import org.apache.atlas.repository.Constants;
-import org.apache.atlas.repository.MetadataRepository;
-import org.apache.atlas.repository.graph.GraphHelper;
-import org.apache.atlas.repository.graphdb.AtlasGraph;
-import org.apache.atlas.repository.graphdb.AtlasVertex;
-import org.apache.atlas.typesystem.exception.EntityNotFoundException;
-import org.apache.atlas.typesystem.exception.SchemaNotFoundException;
-import org.apache.atlas.typesystem.persistence.Id;
-import org.apache.atlas.typesystem.types.TypeUtils;
-import org.apache.atlas.utils.ParamChecker;
-import org.apache.commons.configuration.Configuration;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.stereotype.Component;
-import scala.Option;
-import scala.Some;
-import scala.collection.JavaConversions;
-import scala.collection.immutable.List;
-
-import javax.inject.Inject;
-import javax.inject.Singleton;
-import java.util.Arrays;
-import java.util.Iterator;
-
-/**
- * Hive implementation of Lineage service interface.
- */
-@Singleton
-@Component
-public class DataSetLineageService implements LineageService {
-
- private static final Logger LOG = LoggerFactory.getLogger(DataSetLineageService.class);
-
- private static final Option<List<String>> SELECT_ATTRIBUTES =
- Some.apply(JavaConversions.asScalaBuffer(Arrays.asList(AtlasClient.NAME,
- AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME)).toList());
- public static final String SELECT_INSTANCE_GUID = "__guid";
-
- public static final String DATASET_SCHEMA_QUERY_PREFIX = "atlas.lineage.schema.query.";
-
- private static final String HIVE_PROCESS_TYPE_NAME = "Process";
- private static final String HIVE_PROCESS_INPUT_ATTRIBUTE_NAME = "inputs";
- private static final String HIVE_PROCESS_OUTPUT_ATTRIBUTE_NAME = "outputs";
-
- private static final Configuration propertiesConf;
-
- static {
- try {
- propertiesConf = ApplicationProperties.get();
- } catch (AtlasException e) {
- throw new RuntimeException(e);
- }
- }
-
-
- private final AtlasGraph graph;
- private final DefaultGraphPersistenceStrategy graphPersistenceStrategy;
- private final GraphBackedDiscoveryService discoveryService;
-
- @Inject
- DataSetLineageService(MetadataRepository metadataRepository,
- GraphBackedDiscoveryService discoveryService,
- AtlasGraph atlasGraph) throws DiscoveryException {
- this.graph = atlasGraph;
- this.graphPersistenceStrategy = new DefaultGraphPersistenceStrategy(metadataRepository);
- this.discoveryService = discoveryService;
- }
-
- /**
- * Return the lineage outputs graph for the given datasetName.
- *
- * @param datasetName datasetName
- * @return Outputs Graph as JSON
- */
- @Override
- @GraphTransaction
- public String getOutputsGraph(String datasetName) throws AtlasException {
- LOG.info("Fetching lineage outputs graph for datasetName={}", datasetName);
- datasetName = ParamChecker.notEmpty(datasetName, "dataset name");
- TypeUtils.Pair<String, String> typeIdPair = validateDatasetNameExists(datasetName);
- return getOutputsGraphForId(typeIdPair.right);
- }
-
- /**
- * Return the lineage inputs graph for the given tableName.
- *
- * @param tableName tableName
- * @return Inputs Graph as JSON
- */
- @Override
- @GraphTransaction
- public String getInputsGraph(String tableName) throws AtlasException {
- LOG.info("Fetching lineage inputs graph for tableName={}", tableName);
- tableName = ParamChecker.notEmpty(tableName, "table name");
- TypeUtils.Pair<String, String> typeIdPair = validateDatasetNameExists(tableName);
- return getInputsGraphForId(typeIdPair.right);
- }
-
- @Override
- @GraphTransaction
- public String getInputsGraphForEntity(String guid) throws AtlasException {
- LOG.info("Fetching lineage inputs graph for entity={}", guid);
- guid = ParamChecker.notEmpty(guid, "Entity id");
- validateDatasetExists(guid);
- return getInputsGraphForId(guid);
- }
-
- private String getInputsGraphForId(String guid) {
- InputLineageClosureQuery
- inputsQuery = new InputLineageClosureQuery(AtlasClient.DATA_SET_SUPER_TYPE, SELECT_INSTANCE_GUID,
- guid, HIVE_PROCESS_TYPE_NAME,
- HIVE_PROCESS_INPUT_ATTRIBUTE_NAME, HIVE_PROCESS_OUTPUT_ATTRIBUTE_NAME, Option.empty(),
- SELECT_ATTRIBUTES, true, graphPersistenceStrategy, graph);
- GremlinQueryResult result = inputsQuery.evaluate();
- return inputsQuery.graph(result).toInstanceJson();
- }
-
- @Override
- @GraphTransaction
- public String getOutputsGraphForEntity(String guid) throws AtlasException {
- LOG.info("Fetching lineage outputs graph for entity guid={}", guid);
- guid = ParamChecker.notEmpty(guid, "Entity id");
- validateDatasetExists(guid);
- return getOutputsGraphForId(guid);
- }
-
- private String getOutputsGraphForId(String guid) {
- OutputLineageClosureQuery outputsQuery =
- new OutputLineageClosureQuery(AtlasClient.DATA_SET_SUPER_TYPE, SELECT_INSTANCE_GUID, guid, HIVE_PROCESS_TYPE_NAME,
- HIVE_PROCESS_INPUT_ATTRIBUTE_NAME, HIVE_PROCESS_OUTPUT_ATTRIBUTE_NAME, Option.empty(),
- SELECT_ATTRIBUTES, true, graphPersistenceStrategy, graph);
- GremlinQueryResult result = outputsQuery.evaluate();
- return outputsQuery.graph(result).toInstanceJson();
- }
-
- /**
- * Return the schema for the given tableName.
- *
- * @param datasetName tableName
- * @return Schema as JSON
- */
- @Override
- @GraphTransaction
- public String getSchema(String datasetName) throws AtlasException {
- datasetName = ParamChecker.notEmpty(datasetName, "table name");
- LOG.info("Fetching schema for tableName={}", datasetName);
- TypeUtils.Pair<String, String> typeIdPair = validateDatasetNameExists(datasetName);
-
- return getSchemaForId(typeIdPair.left, typeIdPair.right);
- }
-
- private String getSchemaForId(String typeName, String guid) throws DiscoveryException, SchemaNotFoundException {
- String configName = DATASET_SCHEMA_QUERY_PREFIX + typeName;
- if (propertiesConf.getString(configName) != null) {
- final String schemaQuery =
- String.format(propertiesConf.getString(configName), guid);
- int limit = AtlasConfiguration.SEARCH_MAX_LIMIT.getInt();
- return discoveryService.searchByDSL(schemaQuery, new QueryParams(limit, 0));
- }
- throw new SchemaNotFoundException("Schema is not configured for type " + typeName + ". Configure " + configName);
- }
-
- @Override
- @GraphTransaction
- public String getSchemaForEntity(String guid) throws AtlasException {
- guid = ParamChecker.notEmpty(guid, "Entity id");
- LOG.info("Fetching schema for entity guid={}", guid);
- String typeName = validateDatasetExists(guid);
- return getSchemaForId(typeName, guid);
- }
-
- /**
- * Validate if indeed this is a table type and exists.
- *
- * @param datasetName table name
- */
- private TypeUtils.Pair<String, String> validateDatasetNameExists(String datasetName) throws AtlasException {
- Iterator<AtlasVertex> results = graph.query().has("Referenceable.qualifiedName", datasetName)
- .has(Constants.STATE_PROPERTY_KEY, Id.EntityState.ACTIVE.name())
- .has(Constants.SUPER_TYPES_PROPERTY_KEY, AtlasClient.DATA_SET_SUPER_TYPE)
- .vertices().iterator();
- while (results.hasNext()) {
- AtlasVertex vertex = results.next();
- return TypeUtils.Pair.of(GraphHelper.getTypeName(vertex), GraphHelper.getGuid(vertex));
- }
- throw new EntityNotFoundException("Dataset with name = " + datasetName + " does not exist");
- }
-
- /**
- * Validate if indeed this is a table type and exists.
- *
- * @param guid entity id
- */
- private String validateDatasetExists(String guid) throws AtlasException {
- for (AtlasVertex vertex : (Iterable<AtlasVertex>) graph.query().has(Constants.GUID_PROPERTY_KEY, guid)
- .has(Constants.SUPER_TYPES_PROPERTY_KEY, AtlasClient.DATA_SET_SUPER_TYPE)
- .vertices()) {
- return GraphHelper.getTypeName(vertex);
- }
- throw new EntityNotFoundException("Dataset with guid = " + guid + " does not exist");
- }
-}
http://git-wip-us.apache.org/repos/asf/atlas/blob/435fe3fb/repository/src/main/java/org/apache/atlas/discovery/DiscoveryService.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/org/apache/atlas/discovery/DiscoveryService.java b/repository/src/main/java/org/apache/atlas/discovery/DiscoveryService.java
deleted file mode 100644
index e86047e..0000000
--- a/repository/src/main/java/org/apache/atlas/discovery/DiscoveryService.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.atlas.discovery;
-
-import org.apache.atlas.query.QueryParams;
-
-import java.util.List;
-import java.util.Map;
-
-/**
- * Metadata discovery service.
- */
-public interface DiscoveryService {
-
- /**
- * Searches using Full text query
- * @param query query string
- * @param queryParams Default query parameters like limit, offset
- * @return results json
- * @throws DiscoveryException
- */
- String searchByFullText(String query, QueryParams queryParams) throws DiscoveryException;
-
- /**
- * Searches using DSL query
- * @param dslQuery query string
- * @param queryParams Default query parameters like limit, offset
- * @return results json
- * @throws DiscoveryException
- */
- String searchByDSL(String dslQuery, QueryParams queryParams) throws DiscoveryException;
-
- /**
- * Assumes the User is familiar with the persistence structure of the Repository.
- * The given query is run uninterpreted against the underlying Graph Store.
- * The results are returned as a List of Rows. each row is a Map of Key,Value pairs.
- *
- * @param gremlinQuery query in gremlin dsl format
- * @return List of Maps
- * @throws org.apache.atlas.discovery.DiscoveryException
- */
- List<Map<String, String>> searchByGremlin(String gremlinQuery) throws DiscoveryException;
-}