You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by le...@apache.org on 2021/07/13 00:36:57 UTC
[gobblin] branch master updated: throw failure with
iteratorexecutor when allocating files (#3321)
This is an automated email from the ASF dual-hosted git repository.
lesun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 4edf10c throw failure with iteratorexecutor when allocating files (#3321)
4edf10c is described below
commit 4edf10c3e112c213aa3dcb3299470eadd1cd61ce
Author: William Lo <lo...@gmail.com>
AuthorDate: Mon Jul 12 17:36:48 2021 -0700
throw failure with iteratorexecutor when allocating files (#3321)
---
.../PriorityIterableBasedRequestAllocator.java | 3 ++-
.../BruteForceAllocatorTest.java | 23 ++++++++++++++++++++++
.../request_allocation/GreedyAllocatorTest.java | 22 +++++++++++++++++++++
.../request_allocation/PreOrderAllocatorTest.java | 23 ++++++++++++++++++++++
4 files changed, 70 insertions(+), 1 deletion(-)
diff --git a/gobblin-utility/src/main/java/org/apache/gobblin/util/request_allocation/PriorityIterableBasedRequestAllocator.java b/gobblin-utility/src/main/java/org/apache/gobblin/util/request_allocation/PriorityIterableBasedRequestAllocator.java
index f5be73e..be58e6d 100644
--- a/gobblin-utility/src/main/java/org/apache/gobblin/util/request_allocation/PriorityIterableBasedRequestAllocator.java
+++ b/gobblin-utility/src/main/java/org/apache/gobblin/util/request_allocation/PriorityIterableBasedRequestAllocator.java
@@ -91,7 +91,8 @@ public abstract class PriorityIterableBasedRequestAllocator<T extends Request<T>
try {
List<Either<Void, ExecutionException>> results = executor.executeAndGetResults();
- IteratorExecutor.logFailures(results, log, 10);
+ // Throw runtime failure if an exception occurs during execution to fail the job
+ IteratorExecutor.logAndThrowFailures(results, log, 10);
} catch (InterruptedException ie) {
log.error("Request allocation was interrupted.");
return new AllocatedRequestsIteratorBase<>(
diff --git a/gobblin-utility/src/test/java/org/apache/gobblin/util/request_allocation/BruteForceAllocatorTest.java b/gobblin-utility/src/test/java/org/apache/gobblin/util/request_allocation/BruteForceAllocatorTest.java
index 1243cd1..f5e7f20 100644
--- a/gobblin-utility/src/test/java/org/apache/gobblin/util/request_allocation/BruteForceAllocatorTest.java
+++ b/gobblin-utility/src/test/java/org/apache/gobblin/util/request_allocation/BruteForceAllocatorTest.java
@@ -24,6 +24,7 @@ import org.testng.annotations.Test;
import com.google.common.collect.Lists;
+import static org.mockito.Mockito.*;
public class BruteForceAllocatorTest {
@Test
@@ -53,4 +54,26 @@ public class BruteForceAllocatorTest {
Assert.assertEquals(resultList.get(2).getString(), "d-30");
Assert.assertEquals(resultList.get(3).getString(), "e-20");
}
+
+ @Test
+ public void testThrowExceptionOnFailure() throws Exception {
+ ResourceEstimator<StringRequest> failingEstimator = mock(ResourceEstimator.class);
+ when(failingEstimator.estimateRequirement(any(), any())).thenThrow(new RuntimeException("Error"));
+
+ RequestAllocatorConfig<StringRequest> configuration =
+ RequestAllocatorConfig.builder(failingEstimator)
+ .allowParallelization()
+ .withPrioritizer(new StringRequest.StringRequestComparator()).build();
+ BruteForceAllocator<StringRequest> allocator =
+ new BruteForceAllocator<>(configuration);
+
+ ResourcePool pool = ResourcePool.builder().maxResource(StringRequest.MEMORY, 100.).build();
+
+ List<Requestor<StringRequest>> requests = Lists.<Requestor<StringRequest>>newArrayList(
+ new StringRequestor("r1", "a-50", "f-50", "k-20"),
+ new StringRequestor("r2", "j-10", "b-20", "e-20"),
+ new StringRequestor("r3", "g-20", "c-200", "d-30"));
+
+ Assert.expectThrows(RuntimeException.class, () -> allocator.allocateRequests(requests.iterator(), pool));
+ }
}
\ No newline at end of file
diff --git a/gobblin-utility/src/test/java/org/apache/gobblin/util/request_allocation/GreedyAllocatorTest.java b/gobblin-utility/src/test/java/org/apache/gobblin/util/request_allocation/GreedyAllocatorTest.java
index c1d52d6..ac293d3 100644
--- a/gobblin-utility/src/test/java/org/apache/gobblin/util/request_allocation/GreedyAllocatorTest.java
+++ b/gobblin-utility/src/test/java/org/apache/gobblin/util/request_allocation/GreedyAllocatorTest.java
@@ -26,6 +26,7 @@ import com.google.common.base.Function;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
+import static org.mockito.Mockito.*;
public class GreedyAllocatorTest {
@@ -58,4 +59,25 @@ public class GreedyAllocatorTest {
})), Sets.newHashSet("a-50", "f-50"));
}
+ @Test
+ public void testThrowExceptionOnFailure() throws Exception {
+ ResourceEstimator<StringRequest> failingEstimator = mock(ResourceEstimator.class);
+ when(failingEstimator.estimateRequirement(any(), any())).thenThrow(new RuntimeException("Error"));
+
+ RequestAllocatorConfig<StringRequest> configuration =
+ RequestAllocatorConfig.builder(failingEstimator)
+ .allowParallelization()
+ .withPrioritizer(new StringRequest.StringRequestComparator()).build();
+ GreedyAllocator<StringRequest> allocator = new GreedyAllocator<>(configuration);
+
+ ResourcePool pool = ResourcePool.builder().maxResource(StringRequest.MEMORY, 100.).build();
+
+ List<Requestor<StringRequest>> requests = Lists.<Requestor<StringRequest>>newArrayList(
+ new StringRequestor("r1", "a-50", "f-50", "k-20"),
+ new StringRequestor("r2", "j-10", "b-20", "e-20"),
+ new StringRequestor("r3", "g-20", "c-200", "d-30"));
+
+ Assert.expectThrows(RuntimeException.class, () -> allocator.allocateRequests(requests.iterator(), pool));
+ }
+
}
\ No newline at end of file
diff --git a/gobblin-utility/src/test/java/org/apache/gobblin/util/request_allocation/PreOrderAllocatorTest.java b/gobblin-utility/src/test/java/org/apache/gobblin/util/request_allocation/PreOrderAllocatorTest.java
index 38d5d99..a27970a 100644
--- a/gobblin-utility/src/test/java/org/apache/gobblin/util/request_allocation/PreOrderAllocatorTest.java
+++ b/gobblin-utility/src/test/java/org/apache/gobblin/util/request_allocation/PreOrderAllocatorTest.java
@@ -24,6 +24,7 @@ import org.testng.annotations.Test;
import com.google.common.collect.Lists;
+import static org.mockito.Mockito.*;
public class PreOrderAllocatorTest {
@@ -61,4 +62,26 @@ public class PreOrderAllocatorTest {
Assert.assertFalse(estimator.getQueriedRequests().contains("f-50"));
}
+ @Test
+ public void testThrowExceptionOnFailure() throws Exception {
+ ResourceEstimator<StringRequest> failingEstimator = mock(ResourceEstimator.class);
+ when(failingEstimator.estimateRequirement(any(), any())).thenThrow(new RuntimeException("Error"));
+
+ RequestAllocatorConfig<StringRequest> configuration =
+ RequestAllocatorConfig.builder(failingEstimator)
+ .allowParallelization()
+ .withPrioritizer(new StringRequest.StringRequestComparator()).build();
+ PreOrderAllocator<StringRequest> allocator =
+ new PreOrderAllocator<>(configuration);
+
+ ResourcePool pool = ResourcePool.builder().maxResource(StringRequest.MEMORY, 100.).build();
+
+ List<Requestor<StringRequest>> requests = Lists.<Requestor<StringRequest>>newArrayList(
+ new StringRequestor("r1", "a-50", "f-50", "k-20"),
+ new StringRequestor("r2", "j-10", "b-20", "e-20"),
+ new StringRequestor("r3", "g-20", "c-200", "d-30"));
+
+ Assert.expectThrows(RuntimeException.class, () -> allocator.allocateRequests(requests.iterator(), pool));
+ }
+
}
\ No newline at end of file