You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@atlas.apache.org by si...@apache.org on 2021/08/03 05:59:23 UTC
[atlas] 08/11: ATLAS-4164: [Atlas: Spooling] Tables created after
spooling are created before the spooled tables when there is multiple
frequent restart in kafka brokers
This is an automated email from the ASF dual-hosted git repository.
sidmishra pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/atlas.git
commit 16759fabb1e397169633fff05b4f4599ac3707b6
Author: Radhika Kundam <rk...@cloudera.com>
AuthorDate: Tue May 11 18:00:49 2021 -0700
ATLAS-4164: [Atlas: Spooling] Tables created after spooling are created before the spooled tables when there is multiple frequent restart in kafka brokers
Signed-off-by: Sarath Subramanian <sa...@apache.org>
(cherry picked from commit 4100684fa3f63cb2a6267ab24051002ce38de017)
---
.../org/apache/atlas/notification/spool/IndexManagement.java | 10 ++++++++--
1 file changed, 8 insertions(+), 2 deletions(-)
diff --git a/notification/src/main/java/org/apache/atlas/notification/spool/IndexManagement.java b/notification/src/main/java/org/apache/atlas/notification/spool/IndexManagement.java
index 28f9c70..adbb8d1 100644
--- a/notification/src/main/java/org/apache/atlas/notification/spool/IndexManagement.java
+++ b/notification/src/main/java/org/apache/atlas/notification/spool/IndexManagement.java
@@ -98,8 +98,9 @@ public class IndexManagement {
}
public boolean isPending() {
- return !indexReader.isEmpty() ||
- (indexWriter.getCurrent() != null && indexWriter.getCurrent().getLine() > 0);
+ return !indexReader.isEmpty()
+ || (indexWriter.getCurrent() != null && indexWriter.getCurrent().isStatusWriteInProgress())
+ || (indexReader.currentIndexRecord != null && indexReader.currentIndexRecord.getStatus() == IndexRecord.STATUS_READ_IN_PROGRESS);
}
public synchronized DataOutput getSpoolWriter() throws IOException {
@@ -146,6 +147,8 @@ public class IndexManagement {
public void update(IndexRecord record) {
this.indexFileManager.updateIndex(record);
+
+ LOG.info("this.indexFileManager.updateIndex: {}", record.getLine());
}
public void flushSpoolWriter() throws IOException {
@@ -349,6 +352,9 @@ public class IndexManagement {
public IndexRecord next() throws InterruptedException {
this.currentIndexRecord = blockingQueue.poll(retryDestinationMS, TimeUnit.MILLISECONDS);
+ if (this.currentIndexRecord != null) {
+ this.currentIndexRecord.setStatus(IndexRecord.STATUS_READ_IN_PROGRESS);
+ }
return this.currentIndexRecord;
}