You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@atlas.apache.org by si...@apache.org on 2021/08/03 05:59:23 UTC

[atlas] 08/11: ATLAS-4164: [Atlas: Spooling] Tables created after spooling are created before the spooled tables when there is multiple frequent restart in kafka brokers

This is an automated email from the ASF dual-hosted git repository.

sidmishra pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/atlas.git

commit 16759fabb1e397169633fff05b4f4599ac3707b6
Author: Radhika Kundam <rk...@cloudera.com>
AuthorDate: Tue May 11 18:00:49 2021 -0700

    ATLAS-4164: [Atlas: Spooling] Tables created after spooling are created before the spooled tables when there is multiple frequent restart in kafka brokers
    
    Signed-off-by: Sarath Subramanian <sa...@apache.org>
    (cherry picked from commit 4100684fa3f63cb2a6267ab24051002ce38de017)
---
 .../org/apache/atlas/notification/spool/IndexManagement.java   | 10 ++++++++--
 1 file changed, 8 insertions(+), 2 deletions(-)

diff --git a/notification/src/main/java/org/apache/atlas/notification/spool/IndexManagement.java b/notification/src/main/java/org/apache/atlas/notification/spool/IndexManagement.java
index 28f9c70..adbb8d1 100644
--- a/notification/src/main/java/org/apache/atlas/notification/spool/IndexManagement.java
+++ b/notification/src/main/java/org/apache/atlas/notification/spool/IndexManagement.java
@@ -98,8 +98,9 @@ public class IndexManagement {
     }
 
     public boolean isPending() {
-        return !indexReader.isEmpty() ||
-                (indexWriter.getCurrent() != null && indexWriter.getCurrent().getLine() > 0);
+        return !indexReader.isEmpty()
+                || (indexWriter.getCurrent() != null && indexWriter.getCurrent().isStatusWriteInProgress())
+                || (indexReader.currentIndexRecord != null && indexReader.currentIndexRecord.getStatus() == IndexRecord.STATUS_READ_IN_PROGRESS);
     }
 
     public synchronized DataOutput getSpoolWriter() throws IOException {
@@ -146,6 +147,8 @@ public class IndexManagement {
 
     public void update(IndexRecord record) {
         this.indexFileManager.updateIndex(record);
+
+        LOG.info("this.indexFileManager.updateIndex: {}", record.getLine());
     }
 
     public void flushSpoolWriter() throws IOException {
@@ -349,6 +352,9 @@ public class IndexManagement {
 
         public IndexRecord next() throws InterruptedException {
             this.currentIndexRecord = blockingQueue.poll(retryDestinationMS, TimeUnit.MILLISECONDS);
+            if (this.currentIndexRecord != null) {
+                this.currentIndexRecord.setStatus(IndexRecord.STATUS_READ_IN_PROGRESS);
+            }
 
             return this.currentIndexRecord;
         }