You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@atlas.apache.org by Deep Singh <de...@gmail.com> on 2021/06/11 18:10:51 UTC

Review Request 73418: ATLAS-4335: Hook Notifications through Rest Interface

-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/73418/
-----------------------------------------------------------

Review request for atlas, Ashutosh Mestry, Madhan Neethiraj, Radhika Kundam, Sarath Subramanian, Sidharth Mishra, and Umesh Padashetty.


Bugs: ATLAS-4335
    https://issues.apache.org/jira/browse/ATLAS-4335


Repository: atlas


Description
-------

There are 4 parts to this project.

---------------------------------------------------------

The first part is Notification Rest Service.

In this part a new post API (api/atlas/v2/notification/topic/\<topicName>) is created.
NotificationREST.java is the entry point having a method handleNotifications. It does verify the authorization of the incoming requests.
It further checks if the provided topic-name is among valid supported topic names. 
After validations it handover incoming messages to the Kafka notifier which in turn does the job of bumping messages in Kafka topic.

An Integration test(NotificationRestIT.java) is also created to cover this new API

Impacted files are listed below.

webapp/src/main/java/org/apache/atlas/web/rest/NotificationREST.java
authorization/src/main/java/org/apache/atlas/authorize/AtlasPrivilege.java
intg/src/main/java/org/apache/atlas/AtlasErrorCode.java

webapp/src/test/java/org/apache/atlas/web/integration/NotificationRestIT.java
webapp/src/test/resources/json/notifications/create-db-ddl.json
webapp/src/test/resources/json/notifications/create-db.json
webapp/src/test/resources/json/notifications/delete-db.json

-----------------------------------------------------------

The second part is Rest Notifier

In this part, a new Notifier(RestNotification.java) is created. It extends the AbstractNotification interface and works on the same line as KafkaNotification and AtlasFileSpool, it just that it uses AtlasClientV2 to send the notifications to the abovementioned rest API.
A new API endpoint is added in the AtlasClientV2. NotificationProvider is modified to instantiate the new rest notifier based on the configuration flag(atlas.hook.rest.notification.enabled)

Unit test(RestNotificationTest.java) has been added to cover basic flows. AtlasBaseClient is modified to make one of the static variables available to the mentioned unit test.

Impacted files are listed below.

client/client-v2/src/main/java/org/apache/atlas/AtlasClientV2.java
notification/src/main/java/org/apache/atlas/kafka/NotificationProvider.java
notification/src/main/java/org/apache/atlas/notification/rest/RestNotification.java
intg/src/main/java/org/apache/atlas/AtlasConfiguration.java

notification/src/test/java/org/apache/atlas/notification/RestNotificationTest.java
client/common/src/main/java/org/apache/atlas/AtlasBaseClient.java

-------------------------------------------------------------

The third part is Buffered Kafka consumer (tumbling window solution)

This part addresses the reordering of out or order notifications. With the introduction of a new notification delivery mechanism based on rest(RestNotification), it was observed that notifications were landing on the Kafka, not in the same order as they were produced at the services. Particularly when two different services generate events by the same trigger(HMS and HS2). This problem was there with the KafkaNotification delivery mechanism as well, but it never surfaces so frequently as there is a delay in the HS2 events, and the Kafka client delivers messages almost instantaneously to Kafka. With the rest route, the delivery time is not instantaneous, especially in a non-kerbarised or non-token-based authentication setup.
To cover this AtlasKafkaBufferedConsumer is created. It’s a tumbling window solution on the stream of events. Here while consuming events, instead of reading events one by one, it loads events from Kafka in batches(size = 10 events by default). It has 2 such batches in memory, the current batch, and the lookup batch. In a cycle, it processes the current batch, but before processing, it pulls into the current batch all events in the lookup batch having timestamp lower than the greatest event timestamp in the current batch. It also sorts events in the current batch by event timestamp, thus fixes the order of messages.

AtlasKafkaBufferedConsumer's constructor can throw AtlasException therefore KafkaNotification and NotificationHookConsumer are modified to cover for the exception.

Unit test(AtlasKafkaBufferedConsumer) was added to cover the flow extensively.

Impacted files are listed below.

notification/src/main/java/org/apache/atlas/kafka/AtlasKafkaBufferedConsumer.java
notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java
webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java

webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerKafkaTest.java
notification/src/test/java/org/apache/atlas/kafka/AtlasKafkaBufferedConsumerTest.java

-----------------------------------------------------------------

The fourth part is Passing Message creation time along with a message to Hook Executor.

Hook messages are wrapped into AtlasNotificationMessage, which also stores the time stamp. However, this timestamp bares the wrapper creation time and the actual message creation time. AtlasHook buffers the incoming messages from the underlying service into a queue(Courtesy ThreadPoolExecutor). It processes these queued events sequentially one by one (as by default pool has a single thread) and, the event is wrapped into AtlasNotificationMessage when it is picked for processing. Therefore timestamp wrapper has lagged the actual event creation time by the number of milliseconds the event stays on the queue. This has not been a problem with KafkaNotifier as the delivery mechanism was very fast and events were not staying on for more than a few milliseconds. However, in the case of RestNotification, events end us staying on the queue for a significant amount of milliseconds as the delivery mechanism is comparatively slow. This is a critical problem is the Buffered Kafka consumer mention
 ed in the previous part relies on the event creation time. 
To fix this problem, event creation time is passed along with the messages to the queue. After dequeue, actual creation time is passed into the constructor of the AtlasNotificationMessage.

A unit test is added in AbstractNotificationTest to cover this case. Also, AtlasHookTest is modified to cover additional timestamp parameters.


notification/src/main/java/org/apache/atlas/hook/AtlasHook.java
notification/src/main/java/org/apache/atlas/notification/NotificationInterface.java
notification/src/main/java/org/apache/atlas/notification/AbstractNotification.java
notification/src/main/java/org/apache/atlas/notification/spool/AtlasFileSpool.java
intg/src/main/java/org/apache/atlas/model/notification/AtlasNotificationMessage.java
intg/src/main/java/org/apache/atlas/model/notification/AtlasNotificationBaseMessage.java
intg/src/main/java/org/apache/atlas/model/notification/AtlasNotificationStringMessage.java
notification/src/main/java/org/apache/atlas/notification/AtlasNotificationMessageDeserializer.java
notification/src/main/java/org/apache/atlas/kafka/AtlasKafkaMessage.java

notification/src/test/java/org/apache/atlas/notification/AbstractNotificationTest.java
notification/src/test/java/org/apache/atlas/hook/AtlasHookTest.java


Diffs
-----

  addons/falcon-bridge/pom.xml e0d2f3be7 
  addons/hbase-bridge/pom.xml 50fb9e838 
  addons/hive-bridge/pom.xml ea2de4d3b 
  addons/impala-bridge/pom.xml a1bd5920b 
  addons/sqoop-bridge/pom.xml 9d5ac52d9 
  authorization/src/main/java/org/apache/atlas/authorize/AtlasPrivilege.java 5d06e1b29 
  client/client-v2/src/main/java/org/apache/atlas/AtlasClientV2.java eb0e630f9 
  client/common/src/main/java/org/apache/atlas/AtlasBaseClient.java cb35c9446 
  intg/src/main/java/org/apache/atlas/AtlasConfiguration.java 9ef848752 
  intg/src/main/java/org/apache/atlas/AtlasErrorCode.java 2febff469 
  intg/src/main/java/org/apache/atlas/model/notification/AtlasNotificationBaseMessage.java ff45d5713 
  intg/src/main/java/org/apache/atlas/model/notification/AtlasNotificationMessage.java 5869910cb 
  intg/src/main/java/org/apache/atlas/model/notification/AtlasNotificationStringMessage.java bc8442785 
  notification/pom.xml 28d13bc13 
  notification/src/main/java/org/apache/atlas/hook/AtlasHook.java 9162ac144 
  notification/src/main/java/org/apache/atlas/kafka/AtlasKafkaBufferedConsumer.java PRE-CREATION 
  notification/src/main/java/org/apache/atlas/kafka/AtlasKafkaMessage.java af3727df4 
  notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java 32f5183a0 
  notification/src/main/java/org/apache/atlas/kafka/NotificationProvider.java b35af97fd 
  notification/src/main/java/org/apache/atlas/notification/AbstractNotification.java c45a1da95 
  notification/src/main/java/org/apache/atlas/notification/AtlasNotificationMessageDeserializer.java b43bc7c66 
  notification/src/main/java/org/apache/atlas/notification/NotificationInterface.java 3d8d9cc0a 
  notification/src/main/java/org/apache/atlas/notification/rest/RestNotification.java PRE-CREATION 
  notification/src/main/java/org/apache/atlas/notification/spool/AtlasFileSpool.java 0c92c300e 
  notification/src/test/java/org/apache/atlas/hook/AtlasHookTest.java 1ae7c278c 
  notification/src/test/java/org/apache/atlas/kafka/AtlasKafkaBufferedConsumerTest.java PRE-CREATION 
  notification/src/test/java/org/apache/atlas/notification/AbstractNotificationTest.java 8078a6ceb 
  notification/src/test/java/org/apache/atlas/notification/RestNotificationTest.java PRE-CREATION 
  webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java 5643af95c 
  webapp/src/main/java/org/apache/atlas/web/rest/NotificationREST.java PRE-CREATION 
  webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerKafkaTest.java fdfc2560d 
  webapp/src/test/java/org/apache/atlas/web/integration/NotificationRestIT.java PRE-CREATION 
  webapp/src/test/resources/json/notifications/create-db-ddl.json PRE-CREATION 
  webapp/src/test/resources/json/notifications/create-db.json PRE-CREATION 
  webapp/src/test/resources/json/notifications/delete-db.json PRE-CREATION 


Diff: https://reviews.apache.org/r/73418/diff/1/


Testing
-------


Thanks,

Deep Singh