You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@atlas.apache.org by sa...@apache.org on 2021/05/12 16:59:08 UTC

[atlas] branch master updated: ATLAS-4164: [Atlas: Spooling] Tables created after spooling are created before the spooled tables when there is multiple frequent restart in kafka brokers

This is an automated email from the ASF dual-hosted git repository.

sarath pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/atlas.git


The following commit(s) were added to refs/heads/master by this push:
     new 4100684  ATLAS-4164: [Atlas: Spooling] Tables created after spooling are created before the spooled tables when there is multiple frequent restart in kafka brokers
4100684 is described below

commit 4100684fa3f63cb2a6267ab24051002ce38de017
Author: Radhika Kundam <rk...@cloudera.com>
AuthorDate: Tue May 11 18:00:49 2021 -0700

    ATLAS-4164: [Atlas: Spooling] Tables created after spooling are created before the spooled tables when there is multiple frequent restart in kafka brokers
    
    Signed-off-by: Sarath Subramanian <sa...@apache.org>
---
 .../org/apache/atlas/notification/spool/IndexManagement.java   | 10 ++++++++--
 1 file changed, 8 insertions(+), 2 deletions(-)

diff --git a/notification/src/main/java/org/apache/atlas/notification/spool/IndexManagement.java b/notification/src/main/java/org/apache/atlas/notification/spool/IndexManagement.java
index b3a586b..f018983 100644
--- a/notification/src/main/java/org/apache/atlas/notification/spool/IndexManagement.java
+++ b/notification/src/main/java/org/apache/atlas/notification/spool/IndexManagement.java
@@ -98,8 +98,9 @@ public class IndexManagement {
     }
 
     public boolean isPending() {
-        return !indexReader.isEmpty() ||
-                (indexWriter.getCurrent() != null && indexWriter.getCurrent().getLine() > 0);
+        return !indexReader.isEmpty()
+                || (indexWriter.getCurrent() != null && indexWriter.getCurrent().isStatusWriteInProgress())
+                || (indexReader.currentIndexRecord != null && indexReader.currentIndexRecord.getStatus() == IndexRecord.STATUS_READ_IN_PROGRESS);
     }
 
     public synchronized DataOutput getSpoolWriter() throws IOException {
@@ -146,6 +147,8 @@ public class IndexManagement {
 
     public void update(IndexRecord record) {
         this.indexFileManager.updateIndex(record);
+
+        LOG.info("this.indexFileManager.updateIndex: {}", record.getLine());
     }
 
     public void flushSpoolWriter() throws IOException {
@@ -349,6 +352,9 @@ public class IndexManagement {
 
         public IndexRecord next() throws InterruptedException {
             this.currentIndexRecord = blockingQueue.poll(retryDestinationMS, TimeUnit.MILLISECONDS);
+            if (this.currentIndexRecord != null) {
+                this.currentIndexRecord.setStatus(IndexRecord.STATUS_READ_IN_PROGRESS);
+            }
 
             return this.currentIndexRecord;
         }