You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by jl...@apache.org on 2015/09/26 20:27:55 UTC

ambari git commit: AMBARI-13248: Parallel library should process all futures even if one of them throws an exception (jluniya)

Repository: ambari
Updated Branches:
  refs/heads/trunk 156afda50 -> 474e40862


AMBARI-13248: Parallel library should process all futures even if one of them throws an exception (jluniya)


Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/474e4086
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/474e4086
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/474e4086

Branch: refs/heads/trunk
Commit: 474e40862f0e66d12d333d83509453ab5c8728e6
Parents: 156afda
Author: Jayush Luniya <jl...@hortonworks.com>
Authored: Sat Sep 26 11:27:32 2015 -0700
Committer: Jayush Luniya <jl...@hortonworks.com>
Committed: Sat Sep 26 11:27:32 2015 -0700

----------------------------------------------------------------------
 .../apache/ambari/server/utils/Parallel.java    | 30 ++++++++++------
 .../ambari/server/utils/TestParallel.java       | 37 +++++++++++++++++++-
 2 files changed, 56 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ambari/blob/474e4086/ambari-server/src/main/java/org/apache/ambari/server/utils/Parallel.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/utils/Parallel.java b/ambari-server/src/main/java/org/apache/ambari/server/utils/Parallel.java
index c6e2156..0a3e6c4 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/utils/Parallel.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/utils/Parallel.java
@@ -23,6 +23,7 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.Callable;
+import java.util.concurrent.CancellationException;
 import java.util.concurrent.CompletionService;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorCompletionService;
@@ -185,11 +186,16 @@ public class Parallel {
 
     boolean completed = true;
     R[] result = (R[]) new Object[futures.size()];
-    try {
-      for (int i = 0; i < futures.size(); i++) {
-        Future<ResultWrapper<R>> futureResult = completionService.poll(POLL_DURATION_MILLISECONDS, TimeUnit.MILLISECONDS);
+    for (int i = 0; i < futures.size(); i++) {
+      try {
+        Future<ResultWrapper<R>> futureResult = null;
+        try {
+          futureResult = completionService.poll(POLL_DURATION_MILLISECONDS, TimeUnit.MILLISECONDS);
+        } catch (InterruptedException e) {
+          LOG.error("Caught InterruptedException in Parallel.forLoop", e);
+        }
         if (futureResult == null) {
-          // Time out! no progress was made during the last poll duration. Abort the threads and cancel the threads.
+          // Timed out! no progress was made during the last poll duration. Abort the threads and cancel the threads.
           LOG.error("Completion service in Parallel.forLoop timed out!");
           completed = false;
           for(int fIndex = 0; fIndex < futures.size(); fIndex++) {
@@ -204,6 +210,7 @@ public class Parallel {
               LOG.debug("    Task - {} successfully cancelled", fIndex);
             }
           }
+          // Finished processing all futures
           break;
         } else {
           ResultWrapper<R> res = futureResult.get();
@@ -214,13 +221,16 @@ public class Parallel {
             completed = false;
           }
         }
+      } catch (InterruptedException e) {
+        LOG.error("Caught InterruptedException in Parallel.forLoop", e);
+        completed = false;
+      } catch (ExecutionException e) {
+        LOG.error("Caught ExecutionException in Parallel.forLoop", e);
+        completed = false;
+      } catch (CancellationException e) {
+        LOG.error("Caught CancellationException in Parallel.forLoop", e);
+        completed = false;
       }
-    } catch (InterruptedException e) {
-      LOG.error("Caught InterruptedException in Parallel.forLoop", e);
-      completed = false;
-    } catch (ExecutionException e) {
-      LOG.error("Caught ExecutionException in Parallel.forLoop", e);
-      completed = false;
     }
     // Return parallel loop result
     return new ParallelLoopResult<R>(completed, Arrays.asList(result));

http://git-wip-us.apache.org/repos/asf/ambari/blob/474e4086/ambari-server/src/test/java/org/apache/ambari/server/utils/TestParallel.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/utils/TestParallel.java b/ambari-server/src/test/java/org/apache/ambari/server/utils/TestParallel.java
index 0628f20..bfeb446 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/utils/TestParallel.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/utils/TestParallel.java
@@ -109,7 +109,7 @@ public class TestParallel {
    * @throws Exception
    */
   @Test
-  public void testNestedParallelForLoopIterationFailures() throws Exception {
+  public void testNestedParallelForLoop() throws Exception {
     final List<Integer> input = new LinkedList<Integer>();
     for(int i = 0; i < 10; i++) {
       input.add(i);
@@ -185,4 +185,39 @@ public class TestParallel {
       }
     }
   }
+
+  /**
+   * Tests {@link org.apache.ambari.server.utils.Parallel} forLoop iteration exceptions
+   * @throws Exception
+   */
+  @Test
+  public void testParallelForLoopIterationExceptions() throws Exception {
+    final List<Integer> input = new LinkedList<Integer>();
+    for(int i = 0; i < 10; i++) {
+      input.add(i);
+    }
+    final List<Integer> failForList = Arrays.asList(new Integer[] { 2, 5, 7});
+    ParallelLoopResult<Integer> loopResult = Parallel.forLoop(input, new LoopBody<Integer, Integer>() {
+      @Override
+      public Integer run(Integer in1) {
+        if(failForList.contains(in1)) {
+          throw new RuntimeException("Ignore this exception");
+        }
+        return in1 * in1;
+      }
+    });
+    Assert.assertFalse(loopResult.getIsCompleted());
+    Assert.assertNotNull(loopResult.getResult());
+    List<Integer> output = loopResult.getResult();
+    Assert.assertEquals(input.size(), output.size());
+
+    for(int i = 0; i < input.size(); i++) {
+      if(failForList.contains(i)) {
+        Assert.assertNull(output.get(i));
+        output.set(i, i * i);
+      } else {
+        Assert.assertEquals(i * i, (int) output.get(i));
+      }
+    }
+  }
 }