You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@atlas.apache.org by am...@apache.org on 2020/03/02 18:28:51 UTC

[atlas] 02/02: ATLAS-3642: PC fx: WorkItemManager getResults Modification.

This is an automated email from the ASF dual-hosted git repository.

amestry pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/atlas.git

commit cc601d7371fae1dbc16b55d1ca84f06b745700dc
Author: Ashutosh Mestry <am...@cloudera.com>
AuthorDate: Mon Mar 2 09:17:41 2020 -0800

    ATLAS-3642: PC fx: WorkItemManager getResults Modification.
---
 intg/src/main/java/org/apache/atlas/pc/WorkItemConsumer.java | 11 ++++-------
 intg/src/main/java/org/apache/atlas/pc/WorkItemManager.java  |  9 +++++----
 2 files changed, 9 insertions(+), 11 deletions(-)

diff --git a/intg/src/main/java/org/apache/atlas/pc/WorkItemConsumer.java b/intg/src/main/java/org/apache/atlas/pc/WorkItemConsumer.java
index 9ba4bf4..dd76697 100644
--- a/intg/src/main/java/org/apache/atlas/pc/WorkItemConsumer.java
+++ b/intg/src/main/java/org/apache/atlas/pc/WorkItemConsumer.java
@@ -21,6 +21,7 @@ package org.apache.atlas.pc;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.Queue;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
@@ -37,7 +38,7 @@ public abstract class WorkItemConsumer<T> implements Runnable {
     private final AtomicBoolean    isDirty           = new AtomicBoolean(false);
     private final AtomicLong       maxCommitTimeInMs = new AtomicLong(DEFAULT_COMMIT_TIME_IN_MS);
     private CountDownLatch         countdownLatch;
-    private BlockingQueue<Object>  results;
+    private Queue<Object>          results;
 
     public WorkItemConsumer(BlockingQueue<T> queue) {
         this.queue          = queue;
@@ -101,11 +102,7 @@ public abstract class WorkItemConsumer<T> implements Runnable {
     protected abstract void processItem(T item);
 
     protected void addResult(Object value) {
-        try {
-            results.put(value);
-        } catch (InterruptedException e) {
-            LOG.error("Interrupted while adding result: {}", value);
-        }
+        results.add(value);
     }
 
     protected void updateCommitTime(long commitTime) {
@@ -118,7 +115,7 @@ public abstract class WorkItemConsumer<T> implements Runnable {
         this.countdownLatch = countdownLatch;
     }
 
-    public <V> void setResults(BlockingQueue<Object> queue) {
+    public <V> void setResults(Queue<Object> queue) {
         this.results = queue;
     }
 }
diff --git a/intg/src/main/java/org/apache/atlas/pc/WorkItemManager.java b/intg/src/main/java/org/apache/atlas/pc/WorkItemManager.java
index a7ba67c..351421e 100644
--- a/intg/src/main/java/org/apache/atlas/pc/WorkItemManager.java
+++ b/intg/src/main/java/org/apache/atlas/pc/WorkItemManager.java
@@ -23,6 +23,7 @@ import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Queue;
 import java.util.concurrent.*;
 
 public class WorkItemManager<T, U extends WorkItemConsumer> {
@@ -33,7 +34,7 @@ public class WorkItemManager<T, U extends WorkItemConsumer> {
     private final ExecutorService  service;
     private final List<U>          consumers = new ArrayList<>();
     private CountDownLatch         countdownLatch;
-    private BlockingQueue<Object>  resultsQueue;
+    private Queue<Object>          resultsQueue;
 
     public WorkItemManager(WorkItemBuilder builder, String namePrefix, int batchSize, int numWorkers, boolean collectResults) {
         this.numWorkers = numWorkers;
@@ -49,13 +50,13 @@ public class WorkItemManager<T, U extends WorkItemConsumer> {
         this(builder, "workItemConsumer", batchSize, numWorkers, false);
     }
 
-    public void setResultsCollection(BlockingQueue<Object> resultsQueue) {
+    public void setResultsCollection(Queue<Object> resultsQueue) {
         this.resultsQueue = resultsQueue;
     }
 
     private void createConsumers(WorkItemBuilder builder, int numWorkers, boolean collectResults) {
         if (collectResults) {
-            setResultsCollection(new LinkedBlockingQueue<>());
+            setResultsCollection(new ConcurrentLinkedQueue<>());
         }
 
         for (int i = 0; i < numWorkers; i++) {
@@ -124,7 +125,7 @@ public class WorkItemManager<T, U extends WorkItemConsumer> {
         LOG.info("WorkItemManager: Shutdown done!");
     }
 
-    public BlockingQueue getResults() {
+    public Queue getResults() {
         return this.resultsQueue;
     }