You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@atlas.apache.org by sa...@apache.org on 2021/05/12 16:59:08 UTC
[atlas] branch master updated: ATLAS-4164: [Atlas: Spooling] Tables
created after spooling are created before the spooled tables when there is
multiple frequent restart in kafka brokers
This is an automated email from the ASF dual-hosted git repository.
sarath pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/atlas.git
The following commit(s) were added to refs/heads/master by this push:
new 4100684 ATLAS-4164: [Atlas: Spooling] Tables created after spooling are created before the spooled tables when there is multiple frequent restart in kafka brokers
4100684 is described below
commit 4100684fa3f63cb2a6267ab24051002ce38de017
Author: Radhika Kundam <rk...@cloudera.com>
AuthorDate: Tue May 11 18:00:49 2021 -0700
ATLAS-4164: [Atlas: Spooling] Tables created after spooling are created before the spooled tables when there is multiple frequent restart in kafka brokers
Signed-off-by: Sarath Subramanian <sa...@apache.org>
---
.../org/apache/atlas/notification/spool/IndexManagement.java | 10 ++++++++--
1 file changed, 8 insertions(+), 2 deletions(-)
diff --git a/notification/src/main/java/org/apache/atlas/notification/spool/IndexManagement.java b/notification/src/main/java/org/apache/atlas/notification/spool/IndexManagement.java
index b3a586b..f018983 100644
--- a/notification/src/main/java/org/apache/atlas/notification/spool/IndexManagement.java
+++ b/notification/src/main/java/org/apache/atlas/notification/spool/IndexManagement.java
@@ -98,8 +98,9 @@ public class IndexManagement {
}
public boolean isPending() {
- return !indexReader.isEmpty() ||
- (indexWriter.getCurrent() != null && indexWriter.getCurrent().getLine() > 0);
+ return !indexReader.isEmpty()
+ || (indexWriter.getCurrent() != null && indexWriter.getCurrent().isStatusWriteInProgress())
+ || (indexReader.currentIndexRecord != null && indexReader.currentIndexRecord.getStatus() == IndexRecord.STATUS_READ_IN_PROGRESS);
}
public synchronized DataOutput getSpoolWriter() throws IOException {
@@ -146,6 +147,8 @@ public class IndexManagement {
public void update(IndexRecord record) {
this.indexFileManager.updateIndex(record);
+
+ LOG.info("this.indexFileManager.updateIndex: {}", record.getLine());
}
public void flushSpoolWriter() throws IOException {
@@ -349,6 +352,9 @@ public class IndexManagement {
public IndexRecord next() throws InterruptedException {
this.currentIndexRecord = blockingQueue.poll(retryDestinationMS, TimeUnit.MILLISECONDS);
+ if (this.currentIndexRecord != null) {
+ this.currentIndexRecord.setStatus(IndexRecord.STATUS_READ_IN_PROGRESS);
+ }
return this.currentIndexRecord;
}