You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@atlas.apache.org by ma...@apache.org on 2017/11/06 22:50:07 UTC
[3/3] atlas git commit: ATLAS-2251: notification module updates
ATLAS-2251: notification module updates
Project: http://git-wip-us.apache.org/repos/asf/atlas/repo
Commit: http://git-wip-us.apache.org/repos/asf/atlas/commit/64e739da
Tree: http://git-wip-us.apache.org/repos/asf/atlas/tree/64e739da
Diff: http://git-wip-us.apache.org/repos/asf/atlas/diff/64e739da
Branch: refs/heads/ATLAS-2251
Commit: 64e739da7e3bb36398c5ceb966016b435ae76a00
Parents: 3f44770
Author: Madhan Neethiraj <ma...@apache.org>
Authored: Sun Nov 5 15:05:12 2017 -0800
Committer: Madhan Neethiraj <ma...@apache.org>
Committed: Mon Nov 6 14:49:49 2017 -0800
----------------------------------------------------------------------
.../apache/atlas/falcon/hook/FalconHook.java | 2 +-
.../atlas/hbase/bridge/HBaseAtlasHook.java | 2 +-
.../hbase/model/HBaseOperationContext.java | 2 +-
.../org/apache/atlas/hive/hook/HiveHook.java | 2 +-
.../org/apache/atlas/sqoop/hook/SqoopHook.java | 2 +-
.../org/apache/atlas/AtlasConfiguration.java | 83 ----
.../org/apache/atlas/AtlasConfiguration.java | 83 ++++
.../AtlasNotificationBaseMessage.java | 208 ++++++++++
.../notification/AtlasNotificationMessage.java | 101 +++++
.../AtlasNotificationStringMessage.java | 82 ++++
.../model/notification/MessageVersion.java | 170 ++++++++
.../java/org/apache/atlas/type/AtlasType.java | 62 ++-
.../atlas/typesystem/types/DataTypes.java | 4 +-
.../model/instance/AtlasSystemAttributes.java | 21 +
.../org/apache/atlas/v1/model/instance/Id.java | 19 +
.../atlas/v1/model/instance/Referenceable.java | 27 ++
.../apache/atlas/v1/model/instance/Struct.java | 21 +
.../model/notification/EntityNotification.java | 218 ++++++++++
.../v1/model/notification/HookNotification.java | 415 +++++++++++++++++++
.../apache/atlas/v1/model/typedef/TypesDef.java | 27 ++
.../java/org/apache/atlas/hook/AtlasHook.java | 2 +-
.../apache/atlas/kafka/AtlasKafkaConsumer.java | 18 +-
.../apache/atlas/kafka/KafkaNotification.java | 5 +-
.../AbstractMessageDeserializer.java | 129 +-----
.../notification/AbstractNotification.java | 28 +-
.../AbstractNotificationConsumer.java | 19 +-
.../AtlasNotificationBaseMessage.java | 194 ---------
.../notification/AtlasNotificationMessage.java | 87 ----
.../AtlasNotificationMessageDeserializer.java | 59 +--
.../AtlasNotificationStringMessage.java | 66 ---
.../atlas/notification/MessageVersion.java | 141 -------
.../notification/NotificationInterface.java | 39 +-
.../notification/SplitMessageAggregator.java | 2 +
.../entity/EntityMessageDeserializer.java | 38 +-
.../notification/entity/EntityNotification.java | 66 ---
.../entity/EntityNotificationImpl.java | 157 -------
.../hook/HookMessageDeserializer.java | 19 +-
.../notification/hook/HookNotification.java | 275 ------------
.../org/apache/atlas/hook/AtlasHookTest.java | 2 +-
.../apache/atlas/kafka/KafkaConsumerTest.java | 17 +-
.../atlas/kafka/KafkaNotificationTest.java | 4 +-
.../AbstractNotificationConsumerTest.java | 103 +++--
.../notification/AbstractNotificationTest.java | 7 +-
.../AtlasNotificationMessageTest.java | 2 +
.../atlas/notification/MessageVersionTest.java | 1 +
.../SplitMessageAggregatorTest.java | 3 +-
.../entity/EntityMessageDeserializerTest.java | 12 +-
.../entity/EntityNotificationImplTest.java | 147 -------
.../entity/EntityNotificationTest.java | 148 +++++++
.../hook/HookMessageDeserializerTest.java | 16 +-
.../notification/hook/HookNotificationTest.java | 12 +-
.../test/java/org/apache/atlas/DBSandboxer.java | 53 +++
.../NotificationEntityChangeListener.java | 27 +-
.../notification/NotificationHookConsumer.java | 16 +-
.../notification/EntityNotificationIT.java | 2 +-
.../NotificationHookConsumerIT.java | 12 +-
.../NotificationHookConsumerKafkaTest.java | 4 +-
.../NotificationHookConsumerTest.java | 2 +-
.../atlas/web/integration/BaseResourceIT.java | 2 +-
59 files changed, 1893 insertions(+), 1594 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/atlas/blob/64e739da/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/hook/FalconHook.java
----------------------------------------------------------------------
diff --git a/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/hook/FalconHook.java b/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/hook/FalconHook.java
index e9b9765..5912cb0 100644
--- a/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/hook/FalconHook.java
+++ b/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/hook/FalconHook.java
@@ -26,7 +26,7 @@ import org.apache.atlas.falcon.publisher.FalconEventPublisher;
import org.apache.atlas.hook.AtlasHook;
import org.apache.atlas.kafka.NotificationProvider;
import org.apache.atlas.v1.model.instance.Referenceable;
-import org.apache.atlas.notification.hook.HookNotification;
+import org.apache.atlas.v1.model.notification.HookNotification;
import org.apache.falcon.FalconException;
import org.apache.falcon.entity.store.ConfigurationStore;
import org.apache.falcon.entity.v0.feed.Feed;
http://git-wip-us.apache.org/repos/asf/atlas/blob/64e739da/addons/hbase-bridge/src/main/java/org/apache/atlas/hbase/bridge/HBaseAtlasHook.java
----------------------------------------------------------------------
diff --git a/addons/hbase-bridge/src/main/java/org/apache/atlas/hbase/bridge/HBaseAtlasHook.java b/addons/hbase-bridge/src/main/java/org/apache/atlas/hbase/bridge/HBaseAtlasHook.java
index af5eda8..6fcaf1b 100644
--- a/addons/hbase-bridge/src/main/java/org/apache/atlas/hbase/bridge/HBaseAtlasHook.java
+++ b/addons/hbase-bridge/src/main/java/org/apache/atlas/hbase/bridge/HBaseAtlasHook.java
@@ -24,7 +24,7 @@ import org.apache.atlas.hbase.model.HBaseOperationContext;
import org.apache.atlas.hbase.model.HBaseDataTypes;
import org.apache.atlas.hook.AtlasHook;
import org.apache.atlas.v1.model.instance.Referenceable;
-import org.apache.atlas.notification.hook.HookNotification;
+import org.apache.atlas.v1.model.notification.HookNotification;
import org.apache.commons.configuration.Configuration;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.HColumnDescriptor;
http://git-wip-us.apache.org/repos/asf/atlas/blob/64e739da/addons/hbase-bridge/src/main/java/org/apache/atlas/hbase/model/HBaseOperationContext.java
----------------------------------------------------------------------
diff --git a/addons/hbase-bridge/src/main/java/org/apache/atlas/hbase/model/HBaseOperationContext.java b/addons/hbase-bridge/src/main/java/org/apache/atlas/hbase/model/HBaseOperationContext.java
index ce0f212..33858d4 100644
--- a/addons/hbase-bridge/src/main/java/org/apache/atlas/hbase/model/HBaseOperationContext.java
+++ b/addons/hbase-bridge/src/main/java/org/apache/atlas/hbase/model/HBaseOperationContext.java
@@ -19,7 +19,7 @@
package org.apache.atlas.hbase.model;
import org.apache.atlas.hbase.bridge.HBaseAtlasHook;
-import org.apache.atlas.notification.hook.HookNotification;
+import org.apache.atlas.v1.model.notification.HookNotification;
import org.apache.hadoop.hbase.NamespaceDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.HColumnDescriptor;
http://git-wip-us.apache.org/repos/asf/atlas/blob/64e739da/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java
----------------------------------------------------------------------
diff --git a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java
index 9a5d1b9..5f8dcdb 100755
--- a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java
+++ b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java
@@ -29,7 +29,7 @@ import org.apache.atlas.hive.model.HiveDataTypes;
import org.apache.atlas.hook.AtlasHook;
import org.apache.atlas.hook.AtlasHookException;
import org.apache.atlas.v1.model.instance.Referenceable;
-import org.apache.atlas.notification.hook.HookNotification;
+import org.apache.atlas.v1.model.notification.HookNotification;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.fs.Path;
http://git-wip-us.apache.org/repos/asf/atlas/blob/64e739da/addons/sqoop-bridge/src/main/java/org/apache/atlas/sqoop/hook/SqoopHook.java
----------------------------------------------------------------------
diff --git a/addons/sqoop-bridge/src/main/java/org/apache/atlas/sqoop/hook/SqoopHook.java b/addons/sqoop-bridge/src/main/java/org/apache/atlas/sqoop/hook/SqoopHook.java
index 6c1aa99..aee24ab 100644
--- a/addons/sqoop-bridge/src/main/java/org/apache/atlas/sqoop/hook/SqoopHook.java
+++ b/addons/sqoop-bridge/src/main/java/org/apache/atlas/sqoop/hook/SqoopHook.java
@@ -27,7 +27,7 @@ import org.apache.atlas.hive.model.HiveDataTypes;
import org.apache.atlas.hook.AtlasHook;
import org.apache.atlas.hook.AtlasHookException;
import org.apache.atlas.v1.model.instance.Referenceable;
-import org.apache.atlas.notification.hook.HookNotification;
+import org.apache.atlas.v1.model.notification.HookNotification;
import org.apache.atlas.sqoop.model.SqoopDataTypes;
import org.apache.commons.configuration.Configuration;
import org.apache.commons.lang3.StringUtils;
http://git-wip-us.apache.org/repos/asf/atlas/blob/64e739da/common/src/main/java/org/apache/atlas/AtlasConfiguration.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/atlas/AtlasConfiguration.java b/common/src/main/java/org/apache/atlas/AtlasConfiguration.java
deleted file mode 100644
index bd2bf7f..0000000
--- a/common/src/main/java/org/apache/atlas/AtlasConfiguration.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.atlas;
-
-import org.apache.commons.configuration.Configuration;
-
-/**
- * Enum that encapsulated each property name and its default value.
- */
-public enum AtlasConfiguration {
- //web server configuration
- WEBSERVER_MIN_THREADS("atlas.webserver.minthreads", 10),
- WEBSERVER_MAX_THREADS("atlas.webserver.maxthreads", 100),
- WEBSERVER_KEEPALIVE_SECONDS("atlas.webserver.keepalivetimesecs", 60),
- WEBSERVER_QUEUE_SIZE("atlas.webserver.queuesize", 100),
- WEBSERVER_REQUEST_BUFFER_SIZE("atlas.jetty.request.buffer.size", 16192),
-
- QUERY_PARAM_MAX_LENGTH("atlas.query.param.max.length", 4*1024),
-
- NOTIFICATION_MESSAGE_MAX_LENGTH_BYTES("atlas.notification.message.max.length.bytes", (1000 * 1000)),
- NOTIFICATION_MESSAGE_COMPRESSION_ENABLED("atlas.notification.message.compression.enabled", true),
- NOTIFICATION_SPLIT_MESSAGE_SEGMENTS_WAIT_TIME_SECONDS("atlas.notification.split.message.segments.wait.time.seconds", 15 * 60),
- NOTIFICATION_SPLIT_MESSAGE_BUFFER_PURGE_INTERVAL_SECONDS("atlas.notification.split.message.buffer.purge.interval.seconds", 5 * 60),
-
- //search configuration
- SEARCH_MAX_LIMIT("atlas.search.maxlimit", 10000),
- SEARCH_DEFAULT_LIMIT("atlas.search.defaultlimit", 100);
-
- private static final Configuration APPLICATION_PROPERTIES;
-
- static {
- try {
- APPLICATION_PROPERTIES = ApplicationProperties.get();
- } catch (AtlasException e) {
- throw new RuntimeException(e);
- }
- }
-
- private final String propertyName;
- private final Object defaultValue;
-
- AtlasConfiguration(String propertyName, Object defaultValue) {
- this.propertyName = propertyName;
- this.defaultValue = defaultValue;
- }
-
- public int getInt() {
- return APPLICATION_PROPERTIES.getInt(propertyName, Integer.valueOf(defaultValue.toString()).intValue());
- }
-
- public long getLong() {
- return APPLICATION_PROPERTIES.getLong(propertyName, Long.valueOf(defaultValue.toString()).longValue());
- }
-
- public boolean getBoolean() {
- return APPLICATION_PROPERTIES.getBoolean(propertyName, Boolean.valueOf(defaultValue.toString()).booleanValue());
- }
-
- public String getString() {
- return APPLICATION_PROPERTIES.getString(propertyName, defaultValue.toString());
- }
-
- public Object get() {
- Object value = APPLICATION_PROPERTIES.getProperty(propertyName);
- return value == null ? defaultValue : value;
- }
-}
http://git-wip-us.apache.org/repos/asf/atlas/blob/64e739da/intg/src/main/java/org/apache/atlas/AtlasConfiguration.java
----------------------------------------------------------------------
diff --git a/intg/src/main/java/org/apache/atlas/AtlasConfiguration.java b/intg/src/main/java/org/apache/atlas/AtlasConfiguration.java
new file mode 100644
index 0000000..bd2bf7f
--- /dev/null
+++ b/intg/src/main/java/org/apache/atlas/AtlasConfiguration.java
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas;
+
+import org.apache.commons.configuration.Configuration;
+
+/**
+ * Enum that encapsulated each property name and its default value.
+ */
+public enum AtlasConfiguration {
+ //web server configuration
+ WEBSERVER_MIN_THREADS("atlas.webserver.minthreads", 10),
+ WEBSERVER_MAX_THREADS("atlas.webserver.maxthreads", 100),
+ WEBSERVER_KEEPALIVE_SECONDS("atlas.webserver.keepalivetimesecs", 60),
+ WEBSERVER_QUEUE_SIZE("atlas.webserver.queuesize", 100),
+ WEBSERVER_REQUEST_BUFFER_SIZE("atlas.jetty.request.buffer.size", 16192),
+
+ QUERY_PARAM_MAX_LENGTH("atlas.query.param.max.length", 4*1024),
+
+ NOTIFICATION_MESSAGE_MAX_LENGTH_BYTES("atlas.notification.message.max.length.bytes", (1000 * 1000)),
+ NOTIFICATION_MESSAGE_COMPRESSION_ENABLED("atlas.notification.message.compression.enabled", true),
+ NOTIFICATION_SPLIT_MESSAGE_SEGMENTS_WAIT_TIME_SECONDS("atlas.notification.split.message.segments.wait.time.seconds", 15 * 60),
+ NOTIFICATION_SPLIT_MESSAGE_BUFFER_PURGE_INTERVAL_SECONDS("atlas.notification.split.message.buffer.purge.interval.seconds", 5 * 60),
+
+ //search configuration
+ SEARCH_MAX_LIMIT("atlas.search.maxlimit", 10000),
+ SEARCH_DEFAULT_LIMIT("atlas.search.defaultlimit", 100);
+
+ private static final Configuration APPLICATION_PROPERTIES;
+
+ static {
+ try {
+ APPLICATION_PROPERTIES = ApplicationProperties.get();
+ } catch (AtlasException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private final String propertyName;
+ private final Object defaultValue;
+
+ AtlasConfiguration(String propertyName, Object defaultValue) {
+ this.propertyName = propertyName;
+ this.defaultValue = defaultValue;
+ }
+
+ public int getInt() {
+ return APPLICATION_PROPERTIES.getInt(propertyName, Integer.valueOf(defaultValue.toString()).intValue());
+ }
+
+ public long getLong() {
+ return APPLICATION_PROPERTIES.getLong(propertyName, Long.valueOf(defaultValue.toString()).longValue());
+ }
+
+ public boolean getBoolean() {
+ return APPLICATION_PROPERTIES.getBoolean(propertyName, Boolean.valueOf(defaultValue.toString()).booleanValue());
+ }
+
+ public String getString() {
+ return APPLICATION_PROPERTIES.getString(propertyName, defaultValue.toString());
+ }
+
+ public Object get() {
+ Object value = APPLICATION_PROPERTIES.getProperty(propertyName);
+ return value == null ? defaultValue : value;
+ }
+}
http://git-wip-us.apache.org/repos/asf/atlas/blob/64e739da/intg/src/main/java/org/apache/atlas/model/notification/AtlasNotificationBaseMessage.java
----------------------------------------------------------------------
diff --git a/intg/src/main/java/org/apache/atlas/model/notification/AtlasNotificationBaseMessage.java b/intg/src/main/java/org/apache/atlas/model/notification/AtlasNotificationBaseMessage.java
new file mode 100644
index 0000000..2411808
--- /dev/null
+++ b/intg/src/main/java/org/apache/atlas/model/notification/AtlasNotificationBaseMessage.java
@@ -0,0 +1,208 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.model.notification;
+
+
+import org.apache.atlas.AtlasConfiguration;
+import org.apache.commons.codec.binary.Base64;
+import org.apache.commons.codec.binary.StringUtils;
+import org.apache.commons.compress.utils.IOUtils;
+import org.codehaus.jackson.annotate.JsonAutoDetect;
+import org.codehaus.jackson.annotate.JsonIgnoreProperties;
+import org.codehaus.jackson.map.annotate.JsonSerialize;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.xml.bind.annotation.XmlAccessType;
+import javax.xml.bind.annotation.XmlAccessorType;
+import javax.xml.bind.annotation.XmlRootElement;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.zip.GZIPInputStream;
+import java.util.zip.GZIPOutputStream;
+
+import static org.codehaus.jackson.annotate.JsonAutoDetect.Visibility.NONE;
+import static org.codehaus.jackson.annotate.JsonAutoDetect.Visibility.PUBLIC_ONLY;
+
+
+@JsonAutoDetect(getterVisibility=PUBLIC_ONLY, setterVisibility=PUBLIC_ONLY, fieldVisibility=NONE)
+@JsonSerialize(include=JsonSerialize.Inclusion.NON_NULL)
+@JsonIgnoreProperties(ignoreUnknown=true)
+@XmlRootElement
+@XmlAccessorType(XmlAccessType.PROPERTY)
+public class AtlasNotificationBaseMessage {
+ private static final Logger LOG = LoggerFactory.getLogger(AtlasNotificationBaseMessage.class);
+
+ public static final int MESSAGE_MAX_LENGTH_BYTES = AtlasConfiguration.NOTIFICATION_MESSAGE_MAX_LENGTH_BYTES.getInt() - 512; // 512 bytes for envelop;
+ public static final boolean MESSAGE_COMPRESSION_ENABLED = AtlasConfiguration.NOTIFICATION_MESSAGE_COMPRESSION_ENABLED.getBoolean();
+
+ public enum CompressionKind { NONE, GZIP };
+
+ private MessageVersion version = null;
+ private String msgId = null;
+ private CompressionKind msgCompressionKind = CompressionKind.NONE;
+ private int msgSplitIdx = 1;
+ private int msgSplitCount = 1;
+
+
+ public AtlasNotificationBaseMessage() {
+ }
+
+ public AtlasNotificationBaseMessage(MessageVersion version) {
+ this(version, null, CompressionKind.NONE);
+ }
+
+ public AtlasNotificationBaseMessage(MessageVersion version, String msgId, CompressionKind msgCompressionKind) {
+ this.version = version;
+ this.msgId = msgId;
+ this.msgCompressionKind = msgCompressionKind;
+ }
+
+ public AtlasNotificationBaseMessage(MessageVersion version, String msgId, CompressionKind msgCompressionKind, int msgSplitIdx, int msgSplitCount) {
+ this.version = version;
+ this.msgId = msgId;
+ this.msgCompressionKind = msgCompressionKind;
+ this.msgSplitIdx = msgSplitIdx;
+ this.msgSplitCount = msgSplitCount;
+ }
+
+ public void setVersion(MessageVersion version) {
+ this.version = version;
+ }
+
+ public MessageVersion getVersion() {
+ return version;
+ }
+
+ public String getMsgId() {
+ return msgId;
+ }
+
+ public void setMsgId(String msgId) {
+ this.msgId = msgId;
+ }
+
+ public CompressionKind getMsgCompressionKind() {
+ return msgCompressionKind;
+ }
+
+ public void setMsgCompressed(CompressionKind msgCompressionKind) {
+ this.msgCompressionKind = msgCompressionKind;
+ }
+
+ public int getMsgSplitIdx() {
+ return msgSplitIdx;
+ }
+
+ public void setMsgSplitIdx(int msgSplitIdx) {
+ this.msgSplitIdx = msgSplitIdx;
+ }
+
+ public int getMsgSplitCount() {
+ return msgSplitCount;
+ }
+
+ public void setMsgSplitCount(int msgSplitCount) {
+ this.msgSplitCount = msgSplitCount;
+ }
+
+ /**
+ * Compare the version of this message with the given version.
+ *
+ * @param compareToVersion the version to compare to
+ *
+ * @return a negative integer, zero, or a positive integer as this message's version is less than, equal to,
+ * or greater than the given version.
+ */
+ public int compareVersion(MessageVersion compareToVersion) {
+ return version.compareTo(compareToVersion);
+ }
+
+
+ public static byte[] getBytesUtf8(String str) {
+ return StringUtils.getBytesUtf8(str);
+ }
+
+ public static String getStringUtf8(byte[] bytes) {
+ return StringUtils.newStringUtf8(bytes);
+ }
+
+ public static byte[] encodeBase64(byte[] bytes) {
+ return Base64.encodeBase64(bytes);
+ }
+
+ public static byte[] decodeBase64(byte[] bytes) {
+ return Base64.decodeBase64(bytes);
+ }
+
+ public static byte[] gzipCompressAndEncodeBase64(byte[] bytes) {
+ return encodeBase64(gzipCompress(bytes));
+ }
+
+ public static byte[] decodeBase64AndGzipUncompress(byte[] bytes) {
+ return gzipUncompress(decodeBase64(bytes));
+ }
+
+ public static String gzipCompress(String str) {
+ byte[] bytes = getBytesUtf8(str);
+ byte[] compressedBytes = gzipCompress(bytes);
+ byte[] encodedBytes = encodeBase64(compressedBytes);
+
+ return getStringUtf8(encodedBytes);
+ }
+
+ public static String gzipUncompress(String str) {
+ byte[] encodedBytes = getBytesUtf8(str);
+ byte[] compressedBytes = decodeBase64(encodedBytes);
+ byte[] bytes = gzipUncompress(compressedBytes);
+
+ return getStringUtf8(bytes);
+ }
+
+ public static byte[] gzipCompress(byte[] content) {
+ ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
+
+ try {
+ GZIPOutputStream gzipOutputStream = new GZIPOutputStream(byteArrayOutputStream);
+
+ gzipOutputStream.write(content);
+ gzipOutputStream.close();
+ } catch (IOException e) {
+ LOG.error("gzipCompress(): error compressing {} bytes", content.length, e);
+
+ throw new RuntimeException(e);
+ }
+
+ return byteArrayOutputStream.toByteArray();
+ }
+
+ public static byte[] gzipUncompress(byte[] content) {
+ ByteArrayOutputStream out = new ByteArrayOutputStream();
+
+ try {
+ IOUtils.copy(new GZIPInputStream(new ByteArrayInputStream(content)), out);
+ } catch (IOException e) {
+ LOG.error("gzipUncompress(): error uncompressing {} bytes", content.length, e);
+ }
+
+ return out.toByteArray();
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/atlas/blob/64e739da/intg/src/main/java/org/apache/atlas/model/notification/AtlasNotificationMessage.java
----------------------------------------------------------------------
diff --git a/intg/src/main/java/org/apache/atlas/model/notification/AtlasNotificationMessage.java b/intg/src/main/java/org/apache/atlas/model/notification/AtlasNotificationMessage.java
new file mode 100644
index 0000000..5a5b63f
--- /dev/null
+++ b/intg/src/main/java/org/apache/atlas/model/notification/AtlasNotificationMessage.java
@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.model.notification;
+
+import org.codehaus.jackson.annotate.JsonAutoDetect;
+import org.codehaus.jackson.annotate.JsonIgnoreProperties;
+import org.codehaus.jackson.map.annotate.JsonSerialize;
+
+import javax.xml.bind.annotation.XmlAccessType;
+import javax.xml.bind.annotation.XmlAccessorType;
+import javax.xml.bind.annotation.XmlRootElement;
+import java.util.Date;
+
+import static org.codehaus.jackson.annotate.JsonAutoDetect.Visibility.NONE;
+import static org.codehaus.jackson.annotate.JsonAutoDetect.Visibility.PUBLIC_ONLY;
+
+/**
+ * Represents a notification message that is associated with a version.
+ */
+@JsonAutoDetect(getterVisibility=PUBLIC_ONLY, setterVisibility=PUBLIC_ONLY, fieldVisibility=NONE)
+@JsonSerialize(include=JsonSerialize.Inclusion.NON_NULL)
+@JsonIgnoreProperties(ignoreUnknown=true)
+@XmlRootElement
+@XmlAccessorType(XmlAccessType.PROPERTY)
+public class AtlasNotificationMessage<T> extends AtlasNotificationBaseMessage {
+ private String msgSourceIP;
+ private String msgCreatedBy;
+ private long msgCreationTime;
+
+ /**
+ * The actual message.
+ */
+ private T message;
+
+
+ // ----- Constructors ----------------------------------------------------
+ public AtlasNotificationMessage() {
+ }
+
+ public AtlasNotificationMessage(MessageVersion version, T message) {
+ this(version, message, null, null);
+ }
+
+ public AtlasNotificationMessage(MessageVersion version, T message, String msgSourceIP, String createdBy) {
+ super(version);
+
+ this.msgSourceIP = msgSourceIP;
+ this.msgCreatedBy = createdBy;
+ this.msgCreationTime = (new Date()).getTime();
+ this.message = message;
+ }
+
+
+ public String getMsgSourceIP() {
+ return msgSourceIP;
+ }
+
+ public void setMsgSourceIP(String msgSourceIP) {
+ this.msgSourceIP = msgSourceIP;
+ }
+
+ public String getMsgCreatedBy() {
+ return msgCreatedBy;
+ }
+
+ public void setMsgCreatedBy(String msgCreatedBy) {
+ this.msgCreatedBy = msgCreatedBy;
+ }
+
+ public long getMsgCreationTime() {
+ return msgCreationTime;
+ }
+
+ public void setMsgCreationTime(long msgCreationTime) {
+ this.msgCreationTime = msgCreationTime;
+ }
+
+ public T getMessage() {
+ return message;
+ }
+
+ public void setMessage(T message) {
+ this.message = message;
+ }
+}
http://git-wip-us.apache.org/repos/asf/atlas/blob/64e739da/intg/src/main/java/org/apache/atlas/model/notification/AtlasNotificationStringMessage.java
----------------------------------------------------------------------
diff --git a/intg/src/main/java/org/apache/atlas/model/notification/AtlasNotificationStringMessage.java b/intg/src/main/java/org/apache/atlas/model/notification/AtlasNotificationStringMessage.java
new file mode 100644
index 0000000..9064b6c
--- /dev/null
+++ b/intg/src/main/java/org/apache/atlas/model/notification/AtlasNotificationStringMessage.java
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.model.notification;
+
+
+import org.codehaus.jackson.annotate.JsonAutoDetect;
+import org.codehaus.jackson.annotate.JsonIgnoreProperties;
+import org.codehaus.jackson.map.annotate.JsonSerialize;
+
+import javax.xml.bind.annotation.XmlAccessType;
+import javax.xml.bind.annotation.XmlAccessorType;
+import javax.xml.bind.annotation.XmlRootElement;
+
+import static org.codehaus.jackson.annotate.JsonAutoDetect.Visibility.NONE;
+import static org.codehaus.jackson.annotate.JsonAutoDetect.Visibility.PUBLIC_ONLY;
+
+@JsonAutoDetect(getterVisibility=PUBLIC_ONLY, setterVisibility=PUBLIC_ONLY, fieldVisibility=NONE)
+@JsonSerialize(include=JsonSerialize.Inclusion.NON_NULL)
+@JsonIgnoreProperties(ignoreUnknown=true)
+@XmlRootElement
+@XmlAccessorType(XmlAccessType.PROPERTY)
+public class AtlasNotificationStringMessage extends AtlasNotificationBaseMessage {
+ private String message = null;
+
+ public AtlasNotificationStringMessage() {
+ super(MessageVersion.CURRENT_VERSION);
+ }
+
+ public AtlasNotificationStringMessage(String message) {
+ super(MessageVersion.CURRENT_VERSION);
+
+ this.message = message;
+ }
+
+ public AtlasNotificationStringMessage(String message, String msgId, CompressionKind compressionKind) {
+ super(MessageVersion.CURRENT_VERSION, msgId, compressionKind);
+
+ this.message = message;
+ }
+
+ public AtlasNotificationStringMessage(String message, String msgId, CompressionKind compressionKind, int msgSplitIdx, int msgSplitCount) {
+ super(MessageVersion.CURRENT_VERSION, msgId, compressionKind, msgSplitIdx, msgSplitCount);
+
+ this.message = message;
+ }
+
+ public AtlasNotificationStringMessage(byte[] encodedBytes, String msgId, CompressionKind compressionKind) {
+ super(MessageVersion.CURRENT_VERSION, msgId, compressionKind);
+
+ this.message = AtlasNotificationBaseMessage.getStringUtf8(encodedBytes);
+ }
+
+ public AtlasNotificationStringMessage(byte[] encodedBytes, int offset, int length, String msgId, CompressionKind compressionKind, int msgSplitIdx, int msgSplitCount) {
+ super(MessageVersion.CURRENT_VERSION, msgId, compressionKind, msgSplitIdx, msgSplitCount);
+
+ this.message = new String(encodedBytes, offset, length);
+ }
+
+ public void setMessage(String message) {
+ this.message = message;
+ }
+
+ public String getMessage() {
+ return message;
+ }
+}
http://git-wip-us.apache.org/repos/asf/atlas/blob/64e739da/intg/src/main/java/org/apache/atlas/model/notification/MessageVersion.java
----------------------------------------------------------------------
diff --git a/intg/src/main/java/org/apache/atlas/model/notification/MessageVersion.java b/intg/src/main/java/org/apache/atlas/model/notification/MessageVersion.java
new file mode 100644
index 0000000..1dafa94
--- /dev/null
+++ b/intg/src/main/java/org/apache/atlas/model/notification/MessageVersion.java
@@ -0,0 +1,170 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.model.notification;
+
+import org.codehaus.jackson.annotate.JsonAutoDetect;
+import org.codehaus.jackson.annotate.JsonIgnoreProperties;
+import org.codehaus.jackson.map.annotate.JsonSerialize;
+
+import javax.xml.bind.annotation.XmlAccessType;
+import javax.xml.bind.annotation.XmlAccessorType;
+import javax.xml.bind.annotation.XmlRootElement;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+
+import static org.codehaus.jackson.annotate.JsonAutoDetect.Visibility.NONE;
+import static org.codehaus.jackson.annotate.JsonAutoDetect.Visibility.PUBLIC_ONLY;
+
+/**
+ * Represents the version of a notification message.
+ */
+@JsonAutoDetect(getterVisibility=PUBLIC_ONLY, setterVisibility=PUBLIC_ONLY, fieldVisibility=NONE)
+@JsonSerialize(include=JsonSerialize.Inclusion.NON_NULL)
+@JsonIgnoreProperties(ignoreUnknown=true)
+@XmlRootElement
+@XmlAccessorType(XmlAccessType.PROPERTY)
+public class MessageVersion implements Comparable<MessageVersion>, Serializable {
+ private static final long serialVersionUID = 1L;
+
+ /**
+ * Used for message with no version (old format).
+ */
+ public static final MessageVersion NO_VERSION = new MessageVersion("0");
+ public static final MessageVersion VERSION_1 = new MessageVersion("1.0.0");
+
+ public static final MessageVersion CURRENT_VERSION = VERSION_1;
+
+ private String version;
+
+
+ // ----- Constructors ----------------------------------------------------
+ public MessageVersion() {
+ this.version = CURRENT_VERSION.version;
+ }
+
+ /**
+ * Create a message version.
+ *
+ * @param version the version string
+ */
+ public MessageVersion(String version) {
+ this.version = version;
+
+ try {
+ getVersionParts();
+ } catch (NumberFormatException e) {
+ throw new IllegalArgumentException(String.format("Invalid version string : %s.", version), e);
+ }
+ }
+
+ public String getVersion() {
+ return version;
+ }
+
+ public void setVersion(String version) {
+ this.version = version;
+ }
+
+
+ // ----- Comparable ------------------------------------------------------
+
+ @Override
+ public int compareTo(MessageVersion that) {
+ if (that == null) {
+ return 1;
+ }
+
+ Integer[] thisParts = getVersionParts();
+ Integer[] thatParts = that.getVersionParts();
+
+ int length = Math.max(thisParts.length, thatParts.length);
+
+ for (int i = 0; i < length; i++) {
+
+ int comp = getVersionPart(thisParts, i) - getVersionPart(thatParts, i);
+
+ if (comp != 0) {
+ return comp;
+ }
+ }
+ return 0;
+ }
+
+
+ // ----- Object overrides ------------------------------------------------
+
+ @Override
+ public boolean equals(Object that) {
+ if (this == that){
+ return true;
+ }
+
+ if (that == null || getClass() != that.getClass()) {
+ return false;
+ }
+
+ return compareTo((MessageVersion) that) == 0;
+ }
+
+ @Override
+ public int hashCode() {
+ return Arrays.hashCode(getVersionParts());
+ }
+
+
+ @Override
+ public String toString() {
+ return "MessageVersion[version=" + version + "]";
+ }
+
+ // ----- helper methods --------------------------------------------------
+
+ /**
+ * Get the version parts array by splitting the version string.
+ * Strip the trailing zeros (i.e. '1.0.0' equals '1').
+ *
+ * @return the version parts array
+ */
+ public Integer[] getVersionParts() {
+
+ String[] sParts = version.split("\\.");
+ ArrayList<Integer> iParts = new ArrayList<>();
+ int trailingZeros = 0;
+
+ for (String sPart : sParts) {
+ Integer iPart = new Integer(sPart);
+
+ if (iPart == 0) {
+ ++trailingZeros;
+ } else {
+ for (int i = 0; i < trailingZeros; ++i) {
+ iParts.add(0);
+ }
+ trailingZeros = 0;
+ iParts.add(iPart);
+ }
+ }
+ return iParts.toArray(new Integer[iParts.size()]);
+ }
+
+ public Integer getVersionPart(Integer[] versionParts, int i) {
+ return i < versionParts.length ? versionParts[i] : 0;
+ }
+}
http://git-wip-us.apache.org/repos/asf/atlas/blob/64e739da/intg/src/main/java/org/apache/atlas/type/AtlasType.java
----------------------------------------------------------------------
diff --git a/intg/src/main/java/org/apache/atlas/type/AtlasType.java b/intg/src/main/java/org/apache/atlas/type/AtlasType.java
index 551ee21..1b09b93 100644
--- a/intg/src/main/java/org/apache/atlas/type/AtlasType.java
+++ b/intg/src/main/java/org/apache/atlas/type/AtlasType.java
@@ -21,10 +21,19 @@ package org.apache.atlas.type;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.TypeCategory;
import org.apache.atlas.model.typedef.AtlasBaseTypeDef;
+import org.apache.atlas.v1.model.notification.HookNotification.EntityCreateRequest;
+import org.apache.atlas.v1.model.notification.HookNotification.EntityDeleteRequest;
+import org.apache.atlas.v1.model.notification.HookNotification.EntityPartialUpdateRequest;
+import org.apache.atlas.v1.model.notification.HookNotification.EntityUpdateRequest;
+import org.apache.atlas.v1.model.notification.HookNotification.HookNotificationMessage;
+import org.apache.atlas.v1.model.notification.HookNotification.HookNotificationType;
+import org.apache.atlas.v1.model.notification.HookNotification.TypeRequest;
import org.apache.atlas.v1.model.typedef.Multiplicity;
import org.codehaus.jackson.*;
import org.codehaus.jackson.map.*;
import org.codehaus.jackson.map.module.SimpleModule;
+import org.codehaus.jackson.node.ObjectNode;
+import org.codehaus.jackson.type.TypeReference;
import java.io.IOException;
import java.text.ParseException;
@@ -32,8 +41,6 @@ import java.util.Date;
import java.util.List;
-
-
/**
* base class that declares interface for all Atlas types.
*/
@@ -53,6 +60,7 @@ public abstract class AtlasType {
atlasSerDeModule.addDeserializer(Date.class, new DateDeserializer());
atlasSerDeModule.addSerializer(Multiplicity.class, new MultiplicitySerializer());
atlasSerDeModule.addDeserializer(Multiplicity.class, new MultiplicityDeserializer());
+ atlasSerDeModule.addDeserializer(HookNotificationMessage.class, new HookMessageDeserializer());
mapperV1.registerModule(atlasSerDeModule);
}
@@ -148,6 +156,7 @@ public abstract class AtlasType {
try {
ret = mapperV1.writeValueAsString(obj);
}catch (IOException e){
+ e.printStackTrace(System.out);
ret = null;
}
return ret;
@@ -163,6 +172,16 @@ public abstract class AtlasType {
return ret;
}
+ public static <T> T fromV1Json(String jsonStr, TypeReference<T> type) {
+ T ret;
+ try {
+ ret = mapperV1.readValue(jsonStr, type);
+ }catch (IOException e){
+ ret = null;
+ }
+ return ret;
+ }
+
static class DateSerializer extends JsonSerializer<Date> {
@Override
public void serialize(Date value, JsonGenerator jgen, SerializerProvider provider) throws IOException {
@@ -233,4 +252,43 @@ public abstract class AtlasType {
return ret;
}
}
+
+ static class HookMessageDeserializer extends JsonDeserializer<HookNotificationMessage> {
+ @Override
+ public HookNotificationMessage deserialize(JsonParser parser, DeserializationContext context) throws IOException {
+ HookNotificationMessage ret = null;
+
+ ObjectMapper mapper = (ObjectMapper) parser.getCodec();
+ ObjectNode root = (ObjectNode) mapper.readTree(parser);
+
+ JsonNode typeNode = root.get("type");
+ String strType = typeNode.asText();
+ HookNotificationType notificationType = HookNotificationType.valueOf(strType);
+
+ switch (notificationType) {
+ case TYPE_CREATE:
+ case TYPE_UPDATE:
+ ret = mapper.readValue(root, TypeRequest.class);
+ break;
+
+ case ENTITY_CREATE:
+ ret = mapper.readValue(root, EntityCreateRequest.class);
+ break;
+
+ case ENTITY_PARTIAL_UPDATE:
+ ret = mapper.readValue(root, EntityPartialUpdateRequest.class);
+ break;
+
+ case ENTITY_FULL_UPDATE:
+ ret = mapper.readValue(root, EntityUpdateRequest.class);
+ break;
+
+ case ENTITY_DELETE:
+ ret = mapper.readValue(root, EntityDeleteRequest.class);
+ break;
+ }
+
+ return ret;
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/atlas/blob/64e739da/intg/src/main/java/org/apache/atlas/typesystem/types/DataTypes.java
----------------------------------------------------------------------
diff --git a/intg/src/main/java/org/apache/atlas/typesystem/types/DataTypes.java b/intg/src/main/java/org/apache/atlas/typesystem/types/DataTypes.java
index 804ad33..dba2d88 100644
--- a/intg/src/main/java/org/apache/atlas/typesystem/types/DataTypes.java
+++ b/intg/src/main/java/org/apache/atlas/typesystem/types/DataTypes.java
@@ -18,7 +18,9 @@
package org.apache.atlas.typesystem.types;
-
+/*
+ * this enum must be in package org.apache.atlas.typesystem.types, since vertex property in GraphDB has reference to this type
+ */
public class DataTypes {
public enum TypeCategory {
PRIMITIVE,
http://git-wip-us.apache.org/repos/asf/atlas/blob/64e739da/intg/src/main/java/org/apache/atlas/v1/model/instance/AtlasSystemAttributes.java
----------------------------------------------------------------------
diff --git a/intg/src/main/java/org/apache/atlas/v1/model/instance/AtlasSystemAttributes.java b/intg/src/main/java/org/apache/atlas/v1/model/instance/AtlasSystemAttributes.java
index 43eca0b..fba22cc 100644
--- a/intg/src/main/java/org/apache/atlas/v1/model/instance/AtlasSystemAttributes.java
+++ b/intg/src/main/java/org/apache/atlas/v1/model/instance/AtlasSystemAttributes.java
@@ -120,4 +120,25 @@ public class AtlasSystemAttributes implements Serializable {
public int hashCode() {
return Objects.hash(createdBy, modifiedBy, createdTime, modifiedTime);
}
+
+
+ @Override
+ public String toString() {
+ return toString(new StringBuilder()).toString();
+ }
+
+ public StringBuilder toString(StringBuilder sb) {
+ if (sb == null) {
+ sb = new StringBuilder();
+ }
+
+ sb.append("AtlasSystemAttributes{")
+ .append("createdBy=").append(createdBy)
+ .append(", modifiedBy=").append(modifiedBy)
+ .append(", createdTime=").append(createdTime)
+ .append(", modifiedTime=").append(modifiedTime)
+ .append("}");
+
+ return sb;
+ }
}
http://git-wip-us.apache.org/repos/asf/atlas/blob/64e739da/intg/src/main/java/org/apache/atlas/v1/model/instance/Id.java
----------------------------------------------------------------------
diff --git a/intg/src/main/java/org/apache/atlas/v1/model/instance/Id.java b/intg/src/main/java/org/apache/atlas/v1/model/instance/Id.java
index 1b250f0..cd2951e 100644
--- a/intg/src/main/java/org/apache/atlas/v1/model/instance/Id.java
+++ b/intg/src/main/java/org/apache/atlas/v1/model/instance/Id.java
@@ -156,6 +156,25 @@ public class Id implements Serializable {
}
+ @Override
+ public String toString() {
+ return toString(new StringBuilder()).toString();
+ }
+
+ public StringBuilder toString(StringBuilder sb) {
+ if (sb == null) {
+ sb = new StringBuilder();
+ }
+
+ sb.append("Id{")
+ .append("id=").append(id)
+ .append(", typeName=").append(typeName)
+ .append(", version=").append(version)
+ .append(", state=").append(state)
+ .append("}");
+
+ return sb;
+ }
private static long nextNegativeLong() {
long ret = s_nextId.getAndDecrement();
http://git-wip-us.apache.org/repos/asf/atlas/blob/64e739da/intg/src/main/java/org/apache/atlas/v1/model/instance/Referenceable.java
----------------------------------------------------------------------
diff --git a/intg/src/main/java/org/apache/atlas/v1/model/instance/Referenceable.java b/intg/src/main/java/org/apache/atlas/v1/model/instance/Referenceable.java
index 44b7a5c..158da45 100644
--- a/intg/src/main/java/org/apache/atlas/v1/model/instance/Referenceable.java
+++ b/intg/src/main/java/org/apache/atlas/v1/model/instance/Referenceable.java
@@ -20,6 +20,7 @@ package org.apache.atlas.v1.model.instance;
+import org.apache.atlas.model.typedef.AtlasBaseTypeDef;
import org.codehaus.jackson.annotate.JsonAutoDetect;
import org.codehaus.jackson.annotate.JsonIgnore;
import org.codehaus.jackson.annotate.JsonIgnoreProperties;
@@ -198,4 +199,30 @@ public class Referenceable extends Struct implements Serializable {
public int hashCode() {
return Objects.hash(id, traits, traitNames, systemAttributes);
}
+
+
+ @Override
+ public StringBuilder toString(StringBuilder sb) {
+ if (sb == null) {
+ sb = new StringBuilder();
+ }
+
+ sb.append("Referenceable{");
+ super.toString(sb);
+ sb.append(", id=");
+ if (id != null) {
+ id.toString(sb);
+ }
+ sb.append(", triats={");
+ AtlasBaseTypeDef.dumpObjects(this.traits, sb);
+ sb.append("}, traitNames=[");
+ AtlasBaseTypeDef.dumpObjects(traitNames, sb);
+ sb.append("], systemAttributes=");
+ if (systemAttributes != null) {
+ systemAttributes.toString(sb);
+ }
+ sb.append("}");
+
+ return sb;
+ }
}
http://git-wip-us.apache.org/repos/asf/atlas/blob/64e739da/intg/src/main/java/org/apache/atlas/v1/model/instance/Struct.java
----------------------------------------------------------------------
diff --git a/intg/src/main/java/org/apache/atlas/v1/model/instance/Struct.java b/intg/src/main/java/org/apache/atlas/v1/model/instance/Struct.java
index 53e00ca..5fa4080 100644
--- a/intg/src/main/java/org/apache/atlas/v1/model/instance/Struct.java
+++ b/intg/src/main/java/org/apache/atlas/v1/model/instance/Struct.java
@@ -19,6 +19,7 @@
package org.apache.atlas.v1.model.instance;
+import org.apache.atlas.model.typedef.AtlasBaseTypeDef;
import org.codehaus.jackson.annotate.JsonAutoDetect;
import org.codehaus.jackson.annotate.JsonIgnore;
import org.codehaus.jackson.annotate.JsonIgnoreProperties;
@@ -138,4 +139,24 @@ public class Struct implements Serializable {
public int hashCode() {
return Objects.hash(typeName, values);
}
+
+ @Override
+ public String toString() {
+ return toString(new StringBuilder()).toString();
+ }
+
+ public StringBuilder toString(StringBuilder sb) {
+ if (sb == null) {
+ sb = new StringBuilder();
+ }
+
+ sb.append("Struct{");
+ sb.append("typeName=").append(typeName);
+ sb.append(", values={");
+ AtlasBaseTypeDef.dumpObjects(values, sb);
+ sb.append("}");
+ sb.append("}");
+
+ return sb;
+ }
}
http://git-wip-us.apache.org/repos/asf/atlas/blob/64e739da/intg/src/main/java/org/apache/atlas/v1/model/notification/EntityNotification.java
----------------------------------------------------------------------
diff --git a/intg/src/main/java/org/apache/atlas/v1/model/notification/EntityNotification.java b/intg/src/main/java/org/apache/atlas/v1/model/notification/EntityNotification.java
new file mode 100644
index 0000000..e4305dd
--- /dev/null
+++ b/intg/src/main/java/org/apache/atlas/v1/model/notification/EntityNotification.java
@@ -0,0 +1,218 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.v1.model.notification;
+
+import org.apache.atlas.model.typedef.AtlasBaseTypeDef;
+import org.apache.atlas.v1.model.instance.Referenceable;
+import org.apache.atlas.v1.model.instance.Struct;
+import org.apache.atlas.type.AtlasClassificationType;
+import org.apache.atlas.type.AtlasTypeRegistry;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.collections.MapUtils;
+import org.codehaus.jackson.annotate.JsonAutoDetect;
+import org.codehaus.jackson.annotate.JsonIgnore;
+import org.codehaus.jackson.annotate.JsonIgnoreProperties;
+import org.codehaus.jackson.map.annotate.JsonSerialize;
+
+import javax.xml.bind.annotation.XmlAccessType;
+import javax.xml.bind.annotation.XmlAccessorType;
+import javax.xml.bind.annotation.XmlRootElement;
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+
+import static org.codehaus.jackson.annotate.JsonAutoDetect.Visibility.NONE;
+import static org.codehaus.jackson.annotate.JsonAutoDetect.Visibility.PUBLIC_ONLY;
+
+/**
+ * Entity notification
+ */
+@JsonAutoDetect(getterVisibility=PUBLIC_ONLY, setterVisibility=PUBLIC_ONLY, fieldVisibility=NONE)
+@JsonSerialize(include=JsonSerialize.Inclusion.NON_NULL)
+@JsonIgnoreProperties(ignoreUnknown=true)
+@XmlRootElement
+@XmlAccessorType(XmlAccessType.PROPERTY)
+public class EntityNotification implements Serializable {
+ private static final long serialVersionUID = 1L;
+
+ public enum OperationType {
+ ENTITY_CREATE,
+ ENTITY_UPDATE,
+ ENTITY_DELETE,
+ TRAIT_ADD,
+ TRAIT_DELETE,
+ TRAIT_UPDATE
+ }
+
+ private Referenceable entity;
+ private OperationType operationType;
+ private List<Struct> traits;
+
+
+ // ----- Constructors ------------------------------------------------------
+
+ /**
+ * No-arg constructor for serialization.
+ */
+ public EntityNotification() {
+ }
+
+ /**
+ * Construct an EntityNotification.
+ *
+ * @param entity the entity subject of the notification
+ * @param operationType the type of operation that caused the notification
+ * @param traits the traits for the given entity
+ */
+ public EntityNotification(Referenceable entity, OperationType operationType, List<Struct> traits) {
+ this.entity = entity;
+ this.operationType = operationType;
+ this.traits = traits;
+ }
+
+ /**
+ * Construct an EntityNotification.
+ *
+ * @param entity the entity subject of the notification
+ * @param operationType the type of operation that caused the notification
+ * @param typeRegistry the Atlas type system
+ */
+ public EntityNotification(Referenceable entity, OperationType operationType, AtlasTypeRegistry typeRegistry) {
+ this(entity, operationType, getAllTraits(entity, typeRegistry));
+ }
+
+ public Referenceable getEntity() {
+ return entity;
+ }
+
+ public void setEntity(Referenceable entity) {
+ this.entity = entity;
+ }
+
+ public OperationType getOperationType() {
+ return operationType;
+ }
+
+ public void setOperationType(OperationType operationType) {
+ this.operationType = operationType;
+ }
+
+ public List<Struct> getTraits() {
+ return traits;
+ }
+
+ public void setTraits(List<Struct> traits) {
+ this.traits = traits;
+ }
+
+ @JsonIgnore
+ public List<Struct> getAllTraits() {
+ return traits;
+ }
+
+
+ // ----- Object overrides --------------------------------------------------
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ EntityNotification that = (EntityNotification) o;
+ return Objects.equals(entity, that.entity) &&
+ operationType == that.operationType &&
+ Objects.equals(traits, that.traits);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(entity, operationType, traits);
+ }
+
+ @Override
+ public String toString() {
+ return toString(new StringBuilder()).toString();
+ }
+
+ public StringBuilder toString(StringBuilder sb) {
+ if (sb == null) {
+ sb = new StringBuilder();
+ }
+
+ sb.append("EntityNotification{");
+ sb.append("entity=");
+ if (entity != null) {
+ entity.toString(sb);
+ } else {
+ sb.append(entity);
+ }
+ sb.append(", operationType=").append(operationType);
+ sb.append(", traits=[");
+ AtlasBaseTypeDef.dumpObjects(traits, sb);
+ sb.append("]");
+ sb.append("}");
+
+ return sb;
+ }
+
+
+ // ----- helper methods ----------------------------------------------------
+
+ private static List<Struct> getAllTraits(Referenceable entityDefinition, AtlasTypeRegistry typeRegistry) {
+ List<Struct> ret = new LinkedList<>();
+
+ for (String traitName : entityDefinition.getTraitNames()) {
+ Struct trait = entityDefinition.getTrait(traitName);
+ AtlasClassificationType traitType = typeRegistry.getClassificationTypeByName(traitName);
+ Set<String> superTypeNames = traitType != null ? traitType.getAllSuperTypes() : null;
+
+ ret.add(trait);
+
+ if (CollectionUtils.isNotEmpty(superTypeNames)) {
+ for (String superTypeName : superTypeNames) {
+ Struct superTypeTrait = new Struct(superTypeName);
+
+ if (MapUtils.isNotEmpty(trait.getValues())) {
+ AtlasClassificationType superType = typeRegistry.getClassificationTypeByName(superTypeName);
+
+ if (superType != null && MapUtils.isNotEmpty(superType.getAllAttributes())) {
+ Map<String, Object> superTypeTraitAttributes = new HashMap<>();
+
+ for (Map.Entry<String, Object> attrEntry : trait.getValues().entrySet()) {
+ String attrName = attrEntry.getKey();
+
+ if (superType.getAllAttributes().containsKey(attrName)) {
+ superTypeTraitAttributes.put(attrName, attrEntry.getValue());
+ }
+ }
+
+ superTypeTrait.setValues(superTypeTraitAttributes);
+ }
+ }
+
+ ret.add(superTypeTrait);
+ }
+ }
+ }
+
+ return ret;
+ }
+}
http://git-wip-us.apache.org/repos/asf/atlas/blob/64e739da/intg/src/main/java/org/apache/atlas/v1/model/notification/HookNotification.java
----------------------------------------------------------------------
diff --git a/intg/src/main/java/org/apache/atlas/v1/model/notification/HookNotification.java b/intg/src/main/java/org/apache/atlas/v1/model/notification/HookNotification.java
new file mode 100644
index 0000000..7be5e0b
--- /dev/null
+++ b/intg/src/main/java/org/apache/atlas/v1/model/notification/HookNotification.java
@@ -0,0 +1,415 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.v1.model.notification;
+
+import com.google.gson.JsonParseException;
+import org.apache.atlas.model.typedef.AtlasBaseTypeDef;
+import org.apache.atlas.v1.model.instance.Referenceable;
+import org.apache.atlas.v1.model.typedef.TypesDef;
+import org.apache.atlas.type.AtlasType;
+import org.apache.commons.lang.StringUtils;
+import org.codehaus.jackson.annotate.JsonAutoDetect;
+import org.codehaus.jackson.annotate.JsonIgnoreProperties;
+import org.codehaus.jackson.map.annotate.JsonSerialize;
+import org.codehaus.jettison.json.JSONArray;
+import org.codehaus.jettison.json.JSONException;
+
+import javax.xml.bind.annotation.XmlAccessType;
+import javax.xml.bind.annotation.XmlAccessorType;
+import javax.xml.bind.annotation.XmlRootElement;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.codehaus.jackson.annotate.JsonAutoDetect.Visibility.NONE;
+import static org.codehaus.jackson.annotate.JsonAutoDetect.Visibility.PUBLIC_ONLY;
+
+/**
+ * Contains the structure of messages transferred from hooks to atlas.
+ */
+public class HookNotification {
+ /**
+ * Type of the hook message.
+ */
+ public enum HookNotificationType {
+ TYPE_CREATE, TYPE_UPDATE, ENTITY_CREATE, ENTITY_PARTIAL_UPDATE, ENTITY_FULL_UPDATE, ENTITY_DELETE
+ }
+
+ /**
+ * Base type of hook message.
+ */
+ @JsonAutoDetect(getterVisibility=PUBLIC_ONLY, setterVisibility=PUBLIC_ONLY, fieldVisibility=NONE)
+ @JsonSerialize(include=JsonSerialize.Inclusion.NON_NULL)
+ @JsonIgnoreProperties(ignoreUnknown=true)
+ @XmlRootElement
+ @XmlAccessorType(XmlAccessType.PROPERTY)
+ public static class HookNotificationMessage implements Serializable {
+ private static final long serialVersionUID = 1L;
+
+ public static final String UNKNOW_USER = "UNKNOWN";
+
+ protected HookNotificationType type;
+ protected String user;
+
+ public HookNotificationMessage() {
+ }
+
+ public HookNotificationMessage(HookNotificationType type, String user) {
+ this.type = type;
+ this.user = user;
+ }
+
+ public HookNotificationType getType() {
+ return type;
+ }
+
+ public void setType(HookNotificationType type) {
+ this.type = type;
+ }
+
+ public String getUser() {
+ if (StringUtils.isEmpty(user)) {
+ return UNKNOW_USER;
+ }
+
+ return user;
+ }
+
+ public void setUser(String user) {
+ this.user = user;
+ }
+
+ @Override
+ public String toString() {
+ return toString(new StringBuilder()).toString();
+ }
+
+ public StringBuilder toString(StringBuilder sb) {
+ if (sb == null) {
+ sb = new StringBuilder();
+ }
+
+ sb.append("HookNotificationMessage{");
+ sb.append("type=").append(type);
+ sb.append(", user=").append(user);
+ sb.append("}");
+
+ return sb;
+ }
+ }
+
+ /**
+ * Hook message for create type definitions.
+ */
+ @JsonAutoDetect(getterVisibility=PUBLIC_ONLY, setterVisibility=PUBLIC_ONLY, fieldVisibility=NONE)
+ @JsonSerialize(include=JsonSerialize.Inclusion.NON_NULL)
+ @JsonIgnoreProperties(ignoreUnknown=true)
+ @XmlRootElement
+ @XmlAccessorType(XmlAccessType.PROPERTY)
+ public static class TypeRequest extends HookNotificationMessage implements Serializable {
+ private static final long serialVersionUID = 1L;
+
+ private TypesDef typesDef;
+
+ public TypeRequest() {
+ }
+
+ public TypeRequest(HookNotificationType type, TypesDef typesDef, String user) {
+ super(type, user);
+ this.typesDef = typesDef;
+ }
+
+ public TypesDef getTypesDef() {
+ return typesDef;
+ }
+
+ public void setTypesDef(TypesDef typesDef) {
+ this.typesDef = typesDef;
+ }
+
+ @Override
+ public StringBuilder toString(StringBuilder sb) {
+ if (sb == null) {
+ sb = new StringBuilder();
+ }
+
+ sb.append("TypeRequest{");
+ super.toString(sb);
+ sb.append("typesDef=");
+ if (typesDef != null) {
+ typesDef.toString(sb);
+ }
+ sb.append("}");
+
+ return sb;
+ }
+ }
+
+ /**
+ * Hook message for creating new entities.
+ */
+ @JsonAutoDetect(getterVisibility=PUBLIC_ONLY, setterVisibility=PUBLIC_ONLY, fieldVisibility=NONE)
+ @JsonSerialize(include=JsonSerialize.Inclusion.NON_NULL)
+ @JsonIgnoreProperties(ignoreUnknown=true)
+ @XmlRootElement
+ @XmlAccessorType(XmlAccessType.PROPERTY)
+ public static class EntityCreateRequest extends HookNotificationMessage implements Serializable {
+ private static final long serialVersionUID = 1L;
+
+ private List<Referenceable> entities;
+
+ public EntityCreateRequest() {
+ }
+
+ public EntityCreateRequest(String user, Referenceable... entities) {
+ this(HookNotificationType.ENTITY_CREATE, Arrays.asList(entities), user);
+ }
+
+ public EntityCreateRequest(String user, List<Referenceable> entities) {
+ this(HookNotificationType.ENTITY_CREATE, entities, user);
+ }
+
+ protected EntityCreateRequest(HookNotificationType type, List<Referenceable> entities, String user) {
+ super(type, user);
+ this.entities = entities;
+ }
+
+ public EntityCreateRequest(String user, JSONArray jsonArray) {
+ super(HookNotificationType.ENTITY_CREATE, user);
+ entities = new ArrayList<>();
+ for (int index = 0; index < jsonArray.length(); index++) {
+ try {
+ entities.add(AtlasType.fromV1Json(jsonArray.getString(index), Referenceable.class));
+ } catch (JSONException e) {
+ throw new JsonParseException(e);
+ }
+ }
+ }
+
+ public List<Referenceable> getEntities() {
+ return entities;
+ }
+
+ public void setEntities(List<Referenceable> entities) {
+ this.entities = entities;
+ }
+
+ @Override
+ public StringBuilder toString(StringBuilder sb) {
+ if (sb == null) {
+ sb = new StringBuilder();
+ }
+
+ sb.append("EntityCreateRequest{");
+ super.toString(sb);
+ sb.append("entities=[");
+ AtlasBaseTypeDef.dumpObjects(getEntities(), sb);
+ sb.append("]");
+ sb.append("}");
+
+ return sb;
+ }
+ }
+
+ /**
+ * Hook message for updating entities(full update).
+ */
+ @JsonAutoDetect(getterVisibility=PUBLIC_ONLY, setterVisibility=PUBLIC_ONLY, fieldVisibility=NONE)
+ @JsonSerialize(include=JsonSerialize.Inclusion.NON_NULL)
+ @JsonIgnoreProperties(ignoreUnknown=true)
+ @XmlRootElement
+ @XmlAccessorType(XmlAccessType.PROPERTY)
+ public static class EntityUpdateRequest extends EntityCreateRequest implements Serializable {
+ private static final long serialVersionUID = 1L;
+
+ public EntityUpdateRequest() {
+ }
+
+ public EntityUpdateRequest(String user, Referenceable... entities) {
+ this(user, Arrays.asList(entities));
+ }
+
+ public EntityUpdateRequest(String user, List<Referenceable> entities) {
+ super(HookNotificationType.ENTITY_FULL_UPDATE, entities, user);
+ }
+
+ @Override
+ public StringBuilder toString(StringBuilder sb) {
+ if (sb == null) {
+ sb = new StringBuilder();
+ }
+
+ sb.append("EntityUpdateRequest{");
+ super.toString(sb);
+ sb.append("entities=[");
+ AtlasBaseTypeDef.dumpObjects(getEntities(), sb);
+ sb.append("]");
+ sb.append("}");
+
+ return sb;
+ }
+ }
+
+ /**
+ * Hook message for updating entities(partial update).
+ */
+ public static class EntityPartialUpdateRequest extends HookNotificationMessage {
+ private static final long serialVersionUID = 1L;
+
+ private String typeName;
+ private String attribute;
+ private String attributeValue;
+ private Referenceable entity;
+
+ public EntityPartialUpdateRequest() {
+ }
+
+ public EntityPartialUpdateRequest(String user, String typeName, String attribute, String attributeValue, Referenceable entity) {
+ super(HookNotificationType.ENTITY_PARTIAL_UPDATE, user);
+
+ this.typeName = typeName;
+ this.attribute = attribute;
+ this.attributeValue = attributeValue;
+ this.entity = entity;
+ }
+
+ public String getTypeName() {
+ return typeName;
+ }
+
+ public void setTypeName(String typeName) {
+ this.typeName = typeName;
+ }
+
+ public String getAttribute() {
+ return attribute;
+ }
+
+ public void setAttribute(String attribute) {
+ this.attribute = attribute;
+ }
+
+ public String getAttributeValue() {
+ return attributeValue;
+ }
+
+ public void setAttributeValue(String attributeValue) {
+ this.attributeValue = attributeValue;
+ }
+
+ public Referenceable getEntity() {
+ return entity;
+ }
+
+ public void setEntity(Referenceable entity) {
+ this.entity = entity;
+ }
+
+ @Override
+ public StringBuilder toString(StringBuilder sb) {
+ if (sb == null) {
+ sb = new StringBuilder();
+ }
+
+ sb.append("EntityPartialUpdateRequest{");
+ super.toString(sb);
+ sb.append("typeName=").append(typeName);
+ sb.append("attribute=").append(attribute);
+ sb.append("attributeValue=").append(attributeValue);
+ sb.append("entity=");
+ if (entity != null) {
+ entity.toString(sb);
+ }
+ sb.append("}");
+
+ return sb;
+ }
+ }
+
+ /**
+ * Hook message for creating new entities.
+ */
+ @JsonAutoDetect(getterVisibility=PUBLIC_ONLY, setterVisibility=PUBLIC_ONLY, fieldVisibility=NONE)
+ @JsonSerialize(include=JsonSerialize.Inclusion.NON_NULL)
+ @JsonIgnoreProperties(ignoreUnknown=true)
+ @XmlRootElement
+ @XmlAccessorType(XmlAccessType.PROPERTY)
+ public static class EntityDeleteRequest extends HookNotificationMessage implements Serializable {
+ private static final long serialVersionUID = 1L;
+
+ private String typeName;
+ private String attribute;
+ private String attributeValue;
+
+ public EntityDeleteRequest() {
+ }
+
+ public EntityDeleteRequest(String user, String typeName, String attribute, String attributeValue) {
+ this(HookNotificationType.ENTITY_DELETE, user, typeName, attribute, attributeValue);
+ }
+
+ protected EntityDeleteRequest(HookNotificationType type, String user, String typeName, String attribute, String attributeValue) {
+ super(type, user);
+
+ this.typeName = typeName;
+ this.attribute = attribute;
+ this.attributeValue = attributeValue;
+ }
+
+ public String getTypeName() {
+ return typeName;
+ }
+
+ public void setTypeName(String typeName) {
+ this.typeName = typeName;
+ }
+
+ public String getAttribute() {
+ return attribute;
+ }
+
+ public void setAttribute(String attribute) {
+ this.attribute = attribute;
+ }
+
+ public String getAttributeValue() {
+ return attributeValue;
+ }
+
+ public void setAttributeValue(String attributeValue) {
+ this.attributeValue = attributeValue;
+ }
+
+ @Override
+ public StringBuilder toString(StringBuilder sb) {
+ if (sb == null) {
+ sb = new StringBuilder();
+ }
+
+ sb.append("EntityDeleteRequest{");
+ super.toString(sb);
+ sb.append("typeName=").append(typeName);
+ sb.append("attribute=").append(attribute);
+ sb.append("attributeValue=").append(attributeValue);
+ sb.append("}");
+
+ return sb;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/atlas/blob/64e739da/intg/src/main/java/org/apache/atlas/v1/model/typedef/TypesDef.java
----------------------------------------------------------------------
diff --git a/intg/src/main/java/org/apache/atlas/v1/model/typedef/TypesDef.java b/intg/src/main/java/org/apache/atlas/v1/model/typedef/TypesDef.java
index 1e67839..f8bcfa3 100644
--- a/intg/src/main/java/org/apache/atlas/v1/model/typedef/TypesDef.java
+++ b/intg/src/main/java/org/apache/atlas/v1/model/typedef/TypesDef.java
@@ -18,6 +18,7 @@
package org.apache.atlas.v1.model.typedef;
+import org.apache.atlas.model.typedef.AtlasBaseTypeDef;
import org.codehaus.jackson.annotate.JsonAutoDetect;
import org.codehaus.jackson.annotate.JsonIgnoreProperties;
import org.codehaus.jackson.map.annotate.JsonSerialize;
@@ -88,4 +89,30 @@ public class TypesDef implements Serializable {
public void setClassTypes(List<ClassTypeDefinition> classTypes) {
this.classTypes = classTypes;
}
+
+
+ @Override
+ public String toString() {
+ return toString(new StringBuilder()).toString();
+ }
+
+ public StringBuilder toString(StringBuilder sb) {
+ if (sb == null) {
+ sb = new StringBuilder();
+ }
+
+ sb.append("TypesDef{");
+ sb.append("enumTypes=[");
+ AtlasBaseTypeDef.dumpObjects(enumTypes, sb);
+ sb.append("], structTypes=[");
+ AtlasBaseTypeDef.dumpObjects(structTypes, sb);
+ sb.append("], traitTypes=[");
+ AtlasBaseTypeDef.dumpObjects(traitTypes, sb);
+ sb.append("], classTypes=[");
+ AtlasBaseTypeDef.dumpObjects(classTypes, sb);
+ sb.append("]");
+ sb.append("}");
+
+ return sb;
+ }
}
http://git-wip-us.apache.org/repos/asf/atlas/blob/64e739da/notification/src/main/java/org/apache/atlas/hook/AtlasHook.java
----------------------------------------------------------------------
diff --git a/notification/src/main/java/org/apache/atlas/hook/AtlasHook.java b/notification/src/main/java/org/apache/atlas/hook/AtlasHook.java
index f412217..a225f3c 100644
--- a/notification/src/main/java/org/apache/atlas/hook/AtlasHook.java
+++ b/notification/src/main/java/org/apache/atlas/hook/AtlasHook.java
@@ -24,7 +24,7 @@ import org.apache.atlas.kafka.NotificationProvider;
import org.apache.atlas.v1.model.instance.Referenceable;
import org.apache.atlas.notification.NotificationException;
import org.apache.atlas.notification.NotificationInterface;
-import org.apache.atlas.notification.hook.HookNotification;
+import org.apache.atlas.v1.model.notification.HookNotification;
import org.apache.atlas.security.InMemoryJAASConfiguration;
import org.apache.atlas.type.AtlasType;
import org.apache.commons.configuration.Configuration;
http://git-wip-us.apache.org/repos/asf/atlas/blob/64e739da/notification/src/main/java/org/apache/atlas/kafka/AtlasKafkaConsumer.java
----------------------------------------------------------------------
diff --git a/notification/src/main/java/org/apache/atlas/kafka/AtlasKafkaConsumer.java b/notification/src/main/java/org/apache/atlas/kafka/AtlasKafkaConsumer.java
index e3bb71c..fd0c4e4 100644
--- a/notification/src/main/java/org/apache/atlas/kafka/AtlasKafkaConsumer.java
+++ b/notification/src/main/java/org/apache/atlas/kafka/AtlasKafkaConsumer.java
@@ -18,7 +18,8 @@
package org.apache.atlas.kafka;
import org.apache.atlas.notification.AbstractNotificationConsumer;
-import org.apache.atlas.notification.MessageDeserializer;
+import org.apache.atlas.notification.AtlasNotificationMessageDeserializer;
+import org.apache.atlas.notification.NotificationInterface;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -41,13 +42,18 @@ public class AtlasKafkaConsumer<T> extends AbstractNotificationConsumer<T> {
private static final Logger LOG = LoggerFactory.getLogger(AtlasKafkaConsumer.class);
private final KafkaConsumer kafkaConsumer;
- private final boolean autoCommitEnabled;
- private long pollTimeoutMilliSeconds = 1000L;
+ private final boolean autoCommitEnabled;
+ private long pollTimeoutMilliSeconds = 1000L;
- public AtlasKafkaConsumer(MessageDeserializer<T> deserializer, KafkaConsumer kafkaConsumer, boolean autoCommitEnabled, long pollTimeoutMilliSeconds) {
+ public AtlasKafkaConsumer(NotificationInterface.NotificationType notificationType, KafkaConsumer kafkaConsumer, boolean autoCommitEnabled, long pollTimeoutMilliSeconds) {
+ this(notificationType.getDeserializer(), kafkaConsumer, autoCommitEnabled, pollTimeoutMilliSeconds);
+ }
+
+ public AtlasKafkaConsumer(AtlasNotificationMessageDeserializer<T> deserializer, KafkaConsumer kafkaConsumer, boolean autoCommitEnabled, long pollTimeoutMilliSeconds) {
super(deserializer);
- this.kafkaConsumer = kafkaConsumer;
- this.autoCommitEnabled = autoCommitEnabled;
+
+ this.autoCommitEnabled = autoCommitEnabled;
+ this.kafkaConsumer = kafkaConsumer;
this.pollTimeoutMilliSeconds = pollTimeoutMilliSeconds;
}
http://git-wip-us.apache.org/repos/asf/atlas/blob/64e739da/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java
----------------------------------------------------------------------
diff --git a/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java b/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java
index 6bb8d73..4d6b444 100644
--- a/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java
+++ b/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java
@@ -23,6 +23,7 @@ import kafka.server.KafkaServer;
import kafka.utils.Time;
import org.apache.atlas.ApplicationProperties;
import org.apache.atlas.AtlasException;
+import org.apache.atlas.model.notification.AtlasNotificationMessage;
import org.apache.atlas.notification.AbstractNotification;
import org.apache.atlas.notification.NotificationConsumer;
import org.apache.atlas.notification.NotificationException;
@@ -40,6 +41,7 @@ import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.zookeeper.server.NIOServerCnxnFactory;
import org.apache.zookeeper.server.ServerCnxnFactory;
import org.apache.zookeeper.server.ZooKeeperServer;
+import org.codehaus.jackson.type.TypeReference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.core.annotation.Order;
@@ -185,7 +187,8 @@ public class KafkaNotification extends AbstractNotification implements Service {
Properties consumerProperties = getConsumerProperties(notificationType);
List<NotificationConsumer<T>> consumers = new ArrayList<>();
- AtlasKafkaConsumer kafkaConsumer = new AtlasKafkaConsumer(notificationType.getDeserializer(), getKafkaConsumer(consumerProperties,notificationType, autoCommitEnabled), autoCommitEnabled, pollTimeOutMs );
+ AtlasKafkaConsumer kafkaConsumer =new AtlasKafkaConsumer(notificationType, getKafkaConsumer(consumerProperties, notificationType, autoCommitEnabled), autoCommitEnabled, pollTimeOutMs);
+
consumers.add(kafkaConsumer);
return consumers;
}
http://git-wip-us.apache.org/repos/asf/atlas/blob/64e739da/notification/src/main/java/org/apache/atlas/notification/AbstractMessageDeserializer.java
----------------------------------------------------------------------
diff --git a/notification/src/main/java/org/apache/atlas/notification/AbstractMessageDeserializer.java b/notification/src/main/java/org/apache/atlas/notification/AbstractMessageDeserializer.java
index 5bfe90e..a787141 100644
--- a/notification/src/main/java/org/apache/atlas/notification/AbstractMessageDeserializer.java
+++ b/notification/src/main/java/org/apache/atlas/notification/AbstractMessageDeserializer.java
@@ -18,145 +18,30 @@
package org.apache.atlas.notification;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMap;
-import com.google.gson.Gson;
-import com.google.gson.GsonBuilder;
-import com.google.gson.JsonDeserializationContext;
-import com.google.gson.JsonDeserializer;
-import com.google.gson.JsonElement;
-import com.google.gson.JsonParseException;
-import com.google.gson.reflect.TypeToken;
-import org.apache.atlas.v1.model.instance.Referenceable;
-import org.apache.atlas.v1.model.instance.Struct;
-import org.apache.atlas.type.AtlasType;
-import org.codehaus.jettison.json.JSONArray;
-import org.codehaus.jettison.json.JSONException;
+import org.apache.atlas.model.notification.AtlasNotificationMessage;
+import org.apache.atlas.model.notification.MessageVersion;
+import org.codehaus.jackson.type.TypeReference;
import org.slf4j.Logger;
-import java.lang.reflect.Type;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
/**
* Base notification message deserializer.
*/
public abstract class AbstractMessageDeserializer<T> extends AtlasNotificationMessageDeserializer<T> {
- private static final Map<Type, JsonDeserializer> DESERIALIZER_MAP = new HashMap<>();
-
- static {
- DESERIALIZER_MAP.put(ImmutableList.class, new ImmutableListDeserializer());
- DESERIALIZER_MAP.put(ImmutableMap.class, new ImmutableMapDeserializer());
- DESERIALIZER_MAP.put(JSONArray.class, new JSONArrayDeserializer());
- DESERIALIZER_MAP.put(Struct.class, new StructDeserializer());
- DESERIALIZER_MAP.put(Referenceable.class, new ReferenceableDeserializer());
- }
-
-
// ----- Constructors ----------------------------------------------------
/**
* Create a deserializer.
*
- * @param notificationMessageType the type of the notification message
* @param expectedVersion the expected message version
- * @param deserializerMap map of individual deserializers used to define this message deserializer
* @param notificationLogger logger for message version mismatch
*/
- public AbstractMessageDeserializer(Type notificationMessageType,
- MessageVersion expectedVersion,
- Map<Type, JsonDeserializer> deserializerMap,
- Logger notificationLogger) {
- super(notificationMessageType, expectedVersion, getDeserializer(deserializerMap), notificationLogger);
+ public AbstractMessageDeserializer(TypeReference<T> messageType,
+ TypeReference<AtlasNotificationMessage<T>> notificationMessageType,
+ MessageVersion expectedVersion, Logger notificationLogger) {
+ super(messageType, notificationMessageType, expectedVersion, notificationLogger);
}
// ----- helper methods --------------------------------------------------
-
- private static Gson getDeserializer(Map<Type, JsonDeserializer> deserializerMap) {
- GsonBuilder builder = new GsonBuilder();
-
- for (Map.Entry<Type, JsonDeserializer> entry : DESERIALIZER_MAP.entrySet()) {
- builder.registerTypeAdapter(entry.getKey(), entry.getValue());
- }
-
- for (Map.Entry<Type, JsonDeserializer> entry : deserializerMap.entrySet()) {
- builder.registerTypeAdapter(entry.getKey(), entry.getValue());
- }
- return builder.create();
- }
-
-
- // ----- deserializer classes --------------------------------------------
-
- /**
- * Deserializer for ImmutableList.
- */
- protected static class ImmutableListDeserializer implements JsonDeserializer<ImmutableList<?>> {
- public static final Type LIST_TYPE = new TypeToken<List<?>>() {
- }.getType();
-
- @Override
- public ImmutableList<?> deserialize(JsonElement json, Type type,
- JsonDeserializationContext context) {
- final List<?> list = context.deserialize(json, LIST_TYPE);
- return ImmutableList.copyOf(list);
- }
- }
-
- /**
- * Deserializer for ImmutableMap.
- */
- protected static class ImmutableMapDeserializer implements JsonDeserializer<ImmutableMap<?, ?>> {
-
- public static final Type MAP_TYPE = new TypeToken<Map<?, ?>>() {
- }.getType();
-
- @Override
- public ImmutableMap<?, ?> deserialize(JsonElement json, Type type,
- JsonDeserializationContext context) {
- final Map<?, ?> map = context.deserialize(json, MAP_TYPE);
- return ImmutableMap.copyOf(map);
- }
- }
-
- /**
- * Deserializer for JSONArray.
- */
- public static final class JSONArrayDeserializer implements JsonDeserializer<JSONArray> {
- @Override
- public JSONArray deserialize(final JsonElement json, final Type type,
- final JsonDeserializationContext context) {
- try {
- return new JSONArray(json.toString());
- } catch (JSONException e) {
- throw new JsonParseException(e.getMessage(), e);
- }
- }
- }
-
- /**
- * Deserializer for Struct.
- */
- protected static final class StructDeserializer implements JsonDeserializer<Struct> {
- @Override
- public Struct deserialize(final JsonElement json, final Type type,
- final JsonDeserializationContext context) {
- return context.deserialize(json, Struct.class);
- }
- }
-
- /**
- * Deserializer for Referenceable.
- */
- protected static final class ReferenceableDeserializer implements JsonDeserializer<Referenceable> {
- @Override
- public Referenceable deserialize(final JsonElement json, final Type type,
- final JsonDeserializationContext context) {
-
- return AtlasType.fromV1Json(json.toString(), Referenceable.class);
- }
- }
}