You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@atlas.apache.org by am...@apache.org on 2020/03/02 18:28:51 UTC
[atlas] 02/02: ATLAS-3642: PC fx: WorkItemManager getResults
Modification.
This is an automated email from the ASF dual-hosted git repository.
amestry pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/atlas.git
commit cc601d7371fae1dbc16b55d1ca84f06b745700dc
Author: Ashutosh Mestry <am...@cloudera.com>
AuthorDate: Mon Mar 2 09:17:41 2020 -0800
ATLAS-3642: PC fx: WorkItemManager getResults Modification.
---
intg/src/main/java/org/apache/atlas/pc/WorkItemConsumer.java | 11 ++++-------
intg/src/main/java/org/apache/atlas/pc/WorkItemManager.java | 9 +++++----
2 files changed, 9 insertions(+), 11 deletions(-)
diff --git a/intg/src/main/java/org/apache/atlas/pc/WorkItemConsumer.java b/intg/src/main/java/org/apache/atlas/pc/WorkItemConsumer.java
index 9ba4bf4..dd76697 100644
--- a/intg/src/main/java/org/apache/atlas/pc/WorkItemConsumer.java
+++ b/intg/src/main/java/org/apache/atlas/pc/WorkItemConsumer.java
@@ -21,6 +21,7 @@ package org.apache.atlas.pc;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.Queue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
@@ -37,7 +38,7 @@ public abstract class WorkItemConsumer<T> implements Runnable {
private final AtomicBoolean isDirty = new AtomicBoolean(false);
private final AtomicLong maxCommitTimeInMs = new AtomicLong(DEFAULT_COMMIT_TIME_IN_MS);
private CountDownLatch countdownLatch;
- private BlockingQueue<Object> results;
+ private Queue<Object> results;
public WorkItemConsumer(BlockingQueue<T> queue) {
this.queue = queue;
@@ -101,11 +102,7 @@ public abstract class WorkItemConsumer<T> implements Runnable {
protected abstract void processItem(T item);
protected void addResult(Object value) {
- try {
- results.put(value);
- } catch (InterruptedException e) {
- LOG.error("Interrupted while adding result: {}", value);
- }
+ results.add(value);
}
protected void updateCommitTime(long commitTime) {
@@ -118,7 +115,7 @@ public abstract class WorkItemConsumer<T> implements Runnable {
this.countdownLatch = countdownLatch;
}
- public <V> void setResults(BlockingQueue<Object> queue) {
+ public <V> void setResults(Queue<Object> queue) {
this.results = queue;
}
}
diff --git a/intg/src/main/java/org/apache/atlas/pc/WorkItemManager.java b/intg/src/main/java/org/apache/atlas/pc/WorkItemManager.java
index a7ba67c..351421e 100644
--- a/intg/src/main/java/org/apache/atlas/pc/WorkItemManager.java
+++ b/intg/src/main/java/org/apache/atlas/pc/WorkItemManager.java
@@ -23,6 +23,7 @@ import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.List;
+import java.util.Queue;
import java.util.concurrent.*;
public class WorkItemManager<T, U extends WorkItemConsumer> {
@@ -33,7 +34,7 @@ public class WorkItemManager<T, U extends WorkItemConsumer> {
private final ExecutorService service;
private final List<U> consumers = new ArrayList<>();
private CountDownLatch countdownLatch;
- private BlockingQueue<Object> resultsQueue;
+ private Queue<Object> resultsQueue;
public WorkItemManager(WorkItemBuilder builder, String namePrefix, int batchSize, int numWorkers, boolean collectResults) {
this.numWorkers = numWorkers;
@@ -49,13 +50,13 @@ public class WorkItemManager<T, U extends WorkItemConsumer> {
this(builder, "workItemConsumer", batchSize, numWorkers, false);
}
- public void setResultsCollection(BlockingQueue<Object> resultsQueue) {
+ public void setResultsCollection(Queue<Object> resultsQueue) {
this.resultsQueue = resultsQueue;
}
private void createConsumers(WorkItemBuilder builder, int numWorkers, boolean collectResults) {
if (collectResults) {
- setResultsCollection(new LinkedBlockingQueue<>());
+ setResultsCollection(new ConcurrentLinkedQueue<>());
}
for (int i = 0; i < numWorkers; i++) {
@@ -124,7 +125,7 @@ public class WorkItemManager<T, U extends WorkItemConsumer> {
LOG.info("WorkItemManager: Shutdown done!");
}
- public BlockingQueue getResults() {
+ public Queue getResults() {
return this.resultsQueue;
}