You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@atlas.apache.org by Sarath Subramanian <sa...@apache.org> on 2020/09/02 06:50:17 UTC

Re: Review Request 72636: NotificationHookConsumer: Concurrent Message Processing

-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/72636/#review221774
-----------------------------------------------------------


Ship it!




Ship It!

- Sarath Subramanian


On Aug. 3, 2020, 4:18 p.m., Ashutosh Mestry wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/72636/
> -----------------------------------------------------------
> 
> (Updated Aug. 3, 2020, 4:18 p.m.)
> 
> 
> Review request for atlas, Madhan Neethiraj, Nikhil Bonte, Nixon Rodrigues, and Sarath Subramanian.
> 
> 
> Bugs: ATLAS-3874
>     https://issues.apache.org/jira/browse/ATLAS-3874
> 
> 
> Repository: atlas
> 
> 
> Description
> -------
> 
> **Background**
> Please see bug description.
> 
> **New Approach**
> The new patch now uses the improvements made by ATLAS-3398.
> 
> Modified *NotificationHookConsumer*: Uses current retry logic. Handles exceptions encountered during commit and displays messages that do not overwhelm the log file.
> Bug fix: *EntityDiscoveryContext*: The *resolvedIdsByUniqAttribs* did not return the correct value if the map contained *AtlasRelationshipObjectId*. The logic in the method *getResolvedEntityVertex* has been tweaked to handle the *AtlasRelationshipObjectId* while maintaining backward compatibility. New method *getAtlasVertexFromResolvedIdsByAttribs* was added to encapsulate the existing processing of subTypes.
> 
> **Earlier Approach** (Obsolete)
> At a high-level: Introduce a notion where the individual consumers are aware of the entities being processed by each other. If there are no entities being processed concurrently, everything proceeds as usual (the way it is before his change). If same entity is being procesed by multiple consumers, then one consumer waits for the other to finish before proceeding.
> 
> Classes:
> New *UniqueKeysExtractor*: Extracts values of unique keys from *AtlasEntitiesWithExtInfo*. It navigates *relationshipAttributes* and *attributes* that has *objectRef* set.
> New *UniquenessChecker*: Maintains a set of unique keys provided by *UniqueKeysExtractor*. It detects the presence of duplicates and waits until duplicates are resolved.
> Modified *NoitficationHookConsumer.createOrUpdate* 
> - Updates *UniquenessChecker* with output from *UniqueKeysExtractor*. Clears the keys at the end of entity creation.
> - Handles JanusGraph's *PermanentLockingException*. See [ATLAS-3801](https://issues.apache.org/jira/browse/ATLAS-3801).
> Modified *NotificationHookConsumer*: Accepts an shared instance of *UniqunessChecker*.
> 
> **Additional Logging**
> Log entry is added when attempt is made to creates entities concurrently.
> 
> 
> Diffs
> -----
> 
>   repository/src/main/java/org/apache/atlas/repository/store/graph/EntityGraphDiscoveryContext.java 2221ac4f4 
>   webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java 3f1ea05e1 
> 
> 
> Diff: https://reviews.apache.org/r/72636/diff/4/
> 
> 
> Testing
> -------
> 
> **Unit tests**
> Tests added to verify new clases.
> 
> *UniquessCheckerTest*
> Performs worst case checking by adding 100s of keys that are duplicates and verifies output of those.
> 
> **Functional tests**
> Used Spark hook to verify. These sequence of commands will create shell entity and first class entity. If resolution for first class entity is available, then no shell entity is created.
> 
> Start Spark shell using:
> ```
> sudo -u hdfs spark-shell
> ```
> 
> Spark sql commands:
> ```
> spark.sql("create table default.t1_1381104676(col1 int)")
> spark.sql("create table default.t2_1381104676(col2 int)")
> spark.sql("select * from t1_1381104676, t2_1381104676 where col1=col2").write.saveAsTable("t3_1381104676")
> ```
> 
> **Volume test**
> Medium-size Kafka dump added.
> 
> *Setup* Add same kafka dump to multiple topics. This will result in contention for almost every message.
> 
> **Pre-commit Build**
> https://builds.apache.org/view/A/view/Atlas/job/PreCommit-ATLAS-Build-Test/2078/
> 
> 
> Thanks,
> 
> Ashutosh Mestry
> 
>