You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by le...@apache.org on 2021/07/13 00:36:57 UTC

[gobblin] branch master updated: throw failure with iteratorexecutor when allocating files (#3321)

This is an automated email from the ASF dual-hosted git repository.

lesun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 4edf10c  throw failure with iteratorexecutor when allocating files (#3321)
4edf10c is described below

commit 4edf10c3e112c213aa3dcb3299470eadd1cd61ce
Author: William Lo <lo...@gmail.com>
AuthorDate: Mon Jul 12 17:36:48 2021 -0700

    throw failure with iteratorexecutor when allocating files (#3321)
---
 .../PriorityIterableBasedRequestAllocator.java     |  3 ++-
 .../BruteForceAllocatorTest.java                   | 23 ++++++++++++++++++++++
 .../request_allocation/GreedyAllocatorTest.java    | 22 +++++++++++++++++++++
 .../request_allocation/PreOrderAllocatorTest.java  | 23 ++++++++++++++++++++++
 4 files changed, 70 insertions(+), 1 deletion(-)

diff --git a/gobblin-utility/src/main/java/org/apache/gobblin/util/request_allocation/PriorityIterableBasedRequestAllocator.java b/gobblin-utility/src/main/java/org/apache/gobblin/util/request_allocation/PriorityIterableBasedRequestAllocator.java
index f5be73e..be58e6d 100644
--- a/gobblin-utility/src/main/java/org/apache/gobblin/util/request_allocation/PriorityIterableBasedRequestAllocator.java
+++ b/gobblin-utility/src/main/java/org/apache/gobblin/util/request_allocation/PriorityIterableBasedRequestAllocator.java
@@ -91,7 +91,8 @@ public abstract class PriorityIterableBasedRequestAllocator<T extends Request<T>
 
       try {
         List<Either<Void, ExecutionException>> results = executor.executeAndGetResults();
-        IteratorExecutor.logFailures(results, log, 10);
+        // Throw runtime failure if an exception occurs during execution to fail the job
+        IteratorExecutor.logAndThrowFailures(results, log, 10);
       } catch (InterruptedException ie) {
         log.error("Request allocation was interrupted.");
         return new AllocatedRequestsIteratorBase<>(
diff --git a/gobblin-utility/src/test/java/org/apache/gobblin/util/request_allocation/BruteForceAllocatorTest.java b/gobblin-utility/src/test/java/org/apache/gobblin/util/request_allocation/BruteForceAllocatorTest.java
index 1243cd1..f5e7f20 100644
--- a/gobblin-utility/src/test/java/org/apache/gobblin/util/request_allocation/BruteForceAllocatorTest.java
+++ b/gobblin-utility/src/test/java/org/apache/gobblin/util/request_allocation/BruteForceAllocatorTest.java
@@ -24,6 +24,7 @@ import org.testng.annotations.Test;
 
 import com.google.common.collect.Lists;
 
+import static org.mockito.Mockito.*;
 
 public class BruteForceAllocatorTest {
   @Test
@@ -53,4 +54,26 @@ public class BruteForceAllocatorTest {
     Assert.assertEquals(resultList.get(2).getString(), "d-30");
     Assert.assertEquals(resultList.get(3).getString(), "e-20");
   }
+
+  @Test
+  public void testThrowExceptionOnFailure() throws Exception {
+    ResourceEstimator<StringRequest> failingEstimator = mock(ResourceEstimator.class);
+    when(failingEstimator.estimateRequirement(any(), any())).thenThrow(new RuntimeException("Error"));
+
+    RequestAllocatorConfig<StringRequest> configuration =
+        RequestAllocatorConfig.builder(failingEstimator)
+            .allowParallelization()
+            .withPrioritizer(new StringRequest.StringRequestComparator()).build();
+    BruteForceAllocator<StringRequest> allocator =
+        new BruteForceAllocator<>(configuration);
+
+    ResourcePool pool = ResourcePool.builder().maxResource(StringRequest.MEMORY, 100.).build();
+
+    List<Requestor<StringRequest>> requests = Lists.<Requestor<StringRequest>>newArrayList(
+        new StringRequestor("r1", "a-50", "f-50", "k-20"),
+        new StringRequestor("r2", "j-10", "b-20", "e-20"),
+        new StringRequestor("r3", "g-20", "c-200", "d-30"));
+
+    Assert.expectThrows(RuntimeException.class, () -> allocator.allocateRequests(requests.iterator(), pool));
+  }
 }
\ No newline at end of file
diff --git a/gobblin-utility/src/test/java/org/apache/gobblin/util/request_allocation/GreedyAllocatorTest.java b/gobblin-utility/src/test/java/org/apache/gobblin/util/request_allocation/GreedyAllocatorTest.java
index c1d52d6..ac293d3 100644
--- a/gobblin-utility/src/test/java/org/apache/gobblin/util/request_allocation/GreedyAllocatorTest.java
+++ b/gobblin-utility/src/test/java/org/apache/gobblin/util/request_allocation/GreedyAllocatorTest.java
@@ -26,6 +26,7 @@ import com.google.common.base.Function;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
 
+import static org.mockito.Mockito.*;
 
 
 public class GreedyAllocatorTest {
@@ -58,4 +59,25 @@ public class GreedyAllocatorTest {
     })), Sets.newHashSet("a-50", "f-50"));
   }
 
+  @Test
+  public void testThrowExceptionOnFailure() throws Exception {
+    ResourceEstimator<StringRequest> failingEstimator = mock(ResourceEstimator.class);
+    when(failingEstimator.estimateRequirement(any(), any())).thenThrow(new RuntimeException("Error"));
+
+    RequestAllocatorConfig<StringRequest> configuration =
+        RequestAllocatorConfig.builder(failingEstimator)
+            .allowParallelization()
+            .withPrioritizer(new StringRequest.StringRequestComparator()).build();
+    GreedyAllocator<StringRequest> allocator = new GreedyAllocator<>(configuration);
+
+    ResourcePool pool = ResourcePool.builder().maxResource(StringRequest.MEMORY, 100.).build();
+
+    List<Requestor<StringRequest>> requests = Lists.<Requestor<StringRequest>>newArrayList(
+        new StringRequestor("r1", "a-50", "f-50", "k-20"),
+        new StringRequestor("r2", "j-10", "b-20", "e-20"),
+        new StringRequestor("r3", "g-20", "c-200", "d-30"));
+
+    Assert.expectThrows(RuntimeException.class, () -> allocator.allocateRequests(requests.iterator(), pool));
+  }
+
 }
\ No newline at end of file
diff --git a/gobblin-utility/src/test/java/org/apache/gobblin/util/request_allocation/PreOrderAllocatorTest.java b/gobblin-utility/src/test/java/org/apache/gobblin/util/request_allocation/PreOrderAllocatorTest.java
index 38d5d99..a27970a 100644
--- a/gobblin-utility/src/test/java/org/apache/gobblin/util/request_allocation/PreOrderAllocatorTest.java
+++ b/gobblin-utility/src/test/java/org/apache/gobblin/util/request_allocation/PreOrderAllocatorTest.java
@@ -24,6 +24,7 @@ import org.testng.annotations.Test;
 
 import com.google.common.collect.Lists;
 
+import static org.mockito.Mockito.*;
 
 public class PreOrderAllocatorTest {
 
@@ -61,4 +62,26 @@ public class PreOrderAllocatorTest {
     Assert.assertFalse(estimator.getQueriedRequests().contains("f-50"));
   }
 
+  @Test
+  public void testThrowExceptionOnFailure() throws Exception {
+    ResourceEstimator<StringRequest> failingEstimator = mock(ResourceEstimator.class);
+    when(failingEstimator.estimateRequirement(any(), any())).thenThrow(new RuntimeException("Error"));
+
+    RequestAllocatorConfig<StringRequest> configuration =
+        RequestAllocatorConfig.builder(failingEstimator)
+            .allowParallelization()
+            .withPrioritizer(new StringRequest.StringRequestComparator()).build();
+    PreOrderAllocator<StringRequest> allocator =
+        new PreOrderAllocator<>(configuration);
+
+    ResourcePool pool = ResourcePool.builder().maxResource(StringRequest.MEMORY, 100.).build();
+
+    List<Requestor<StringRequest>> requests = Lists.<Requestor<StringRequest>>newArrayList(
+        new StringRequestor("r1", "a-50", "f-50", "k-20"),
+        new StringRequestor("r2", "j-10", "b-20", "e-20"),
+        new StringRequestor("r3", "g-20", "c-200", "d-30"));
+
+    Assert.expectThrows(RuntimeException.class, () -> allocator.allocateRequests(requests.iterator(), pool));
+  }
+
 }
\ No newline at end of file