You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by jl...@apache.org on 2015/09/26 20:27:55 UTC
ambari git commit: AMBARI-13248: Parallel library should process all
futures even if one of them throws an exception (jluniya)
Repository: ambari
Updated Branches:
refs/heads/trunk 156afda50 -> 474e40862
AMBARI-13248: Parallel library should process all futures even if one of them throws an exception (jluniya)
Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/474e4086
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/474e4086
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/474e4086
Branch: refs/heads/trunk
Commit: 474e40862f0e66d12d333d83509453ab5c8728e6
Parents: 156afda
Author: Jayush Luniya <jl...@hortonworks.com>
Authored: Sat Sep 26 11:27:32 2015 -0700
Committer: Jayush Luniya <jl...@hortonworks.com>
Committed: Sat Sep 26 11:27:32 2015 -0700
----------------------------------------------------------------------
.../apache/ambari/server/utils/Parallel.java | 30 ++++++++++------
.../ambari/server/utils/TestParallel.java | 37 +++++++++++++++++++-
2 files changed, 56 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ambari/blob/474e4086/ambari-server/src/main/java/org/apache/ambari/server/utils/Parallel.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/utils/Parallel.java b/ambari-server/src/main/java/org/apache/ambari/server/utils/Parallel.java
index c6e2156..0a3e6c4 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/utils/Parallel.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/utils/Parallel.java
@@ -23,6 +23,7 @@ import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
+import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
@@ -185,11 +186,16 @@ public class Parallel {
boolean completed = true;
R[] result = (R[]) new Object[futures.size()];
- try {
- for (int i = 0; i < futures.size(); i++) {
- Future<ResultWrapper<R>> futureResult = completionService.poll(POLL_DURATION_MILLISECONDS, TimeUnit.MILLISECONDS);
+ for (int i = 0; i < futures.size(); i++) {
+ try {
+ Future<ResultWrapper<R>> futureResult = null;
+ try {
+ futureResult = completionService.poll(POLL_DURATION_MILLISECONDS, TimeUnit.MILLISECONDS);
+ } catch (InterruptedException e) {
+ LOG.error("Caught InterruptedException in Parallel.forLoop", e);
+ }
if (futureResult == null) {
- // Time out! no progress was made during the last poll duration. Abort the threads and cancel the threads.
+ // Timed out! no progress was made during the last poll duration. Abort the threads and cancel the threads.
LOG.error("Completion service in Parallel.forLoop timed out!");
completed = false;
for(int fIndex = 0; fIndex < futures.size(); fIndex++) {
@@ -204,6 +210,7 @@ public class Parallel {
LOG.debug(" Task - {} successfully cancelled", fIndex);
}
}
+ // Finished processing all futures
break;
} else {
ResultWrapper<R> res = futureResult.get();
@@ -214,13 +221,16 @@ public class Parallel {
completed = false;
}
}
+ } catch (InterruptedException e) {
+ LOG.error("Caught InterruptedException in Parallel.forLoop", e);
+ completed = false;
+ } catch (ExecutionException e) {
+ LOG.error("Caught ExecutionException in Parallel.forLoop", e);
+ completed = false;
+ } catch (CancellationException e) {
+ LOG.error("Caught CancellationException in Parallel.forLoop", e);
+ completed = false;
}
- } catch (InterruptedException e) {
- LOG.error("Caught InterruptedException in Parallel.forLoop", e);
- completed = false;
- } catch (ExecutionException e) {
- LOG.error("Caught ExecutionException in Parallel.forLoop", e);
- completed = false;
}
// Return parallel loop result
return new ParallelLoopResult<R>(completed, Arrays.asList(result));
http://git-wip-us.apache.org/repos/asf/ambari/blob/474e4086/ambari-server/src/test/java/org/apache/ambari/server/utils/TestParallel.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/utils/TestParallel.java b/ambari-server/src/test/java/org/apache/ambari/server/utils/TestParallel.java
index 0628f20..bfeb446 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/utils/TestParallel.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/utils/TestParallel.java
@@ -109,7 +109,7 @@ public class TestParallel {
* @throws Exception
*/
@Test
- public void testNestedParallelForLoopIterationFailures() throws Exception {
+ public void testNestedParallelForLoop() throws Exception {
final List<Integer> input = new LinkedList<Integer>();
for(int i = 0; i < 10; i++) {
input.add(i);
@@ -185,4 +185,39 @@ public class TestParallel {
}
}
}
+
+ /**
+ * Tests {@link org.apache.ambari.server.utils.Parallel} forLoop iteration exceptions
+ * @throws Exception
+ */
+ @Test
+ public void testParallelForLoopIterationExceptions() throws Exception {
+ final List<Integer> input = new LinkedList<Integer>();
+ for(int i = 0; i < 10; i++) {
+ input.add(i);
+ }
+ final List<Integer> failForList = Arrays.asList(new Integer[] { 2, 5, 7});
+ ParallelLoopResult<Integer> loopResult = Parallel.forLoop(input, new LoopBody<Integer, Integer>() {
+ @Override
+ public Integer run(Integer in1) {
+ if(failForList.contains(in1)) {
+ throw new RuntimeException("Ignore this exception");
+ }
+ return in1 * in1;
+ }
+ });
+ Assert.assertFalse(loopResult.getIsCompleted());
+ Assert.assertNotNull(loopResult.getResult());
+ List<Integer> output = loopResult.getResult();
+ Assert.assertEquals(input.size(), output.size());
+
+ for(int i = 0; i < input.size(); i++) {
+ if(failForList.contains(i)) {
+ Assert.assertNull(output.get(i));
+ output.set(i, i * i);
+ } else {
+ Assert.assertEquals(i * i, (int) output.get(i));
+ }
+ }
+ }
}