You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by zi...@apache.org on 2022/02/10 19:03:15 UTC

[gobblin] branch master updated: [GOBBLIN-1604] Throw exception if there are no allocated requests due to lack of res… (#3461)

This is an automated email from the ASF dual-hosted git repository.

zihanli58 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 2804f10  [GOBBLIN-1604] Throw exception if there are no allocated requests due to lack of res… (#3461)
2804f10 is described below

commit 2804f10124241f85f54e51db513ff6831dd408b9
Author: William Lo <lo...@gmail.com>
AuthorDate: Thu Feb 10 11:03:08 2022 -0800

    [GOBBLIN-1604] Throw exception if there are no allocated requests due to lack of res… (#3461)
    
    * Throw exception if there are no allocated requests due to lack of resources
    
    * Fix typo
---
 .../gobblin/data/management/copy/CopySource.java   |  17 +++-
 .../data/management/copy/CopySourceTest.java       | 108 +++++++++++++++++++++
 2 files changed, 124 insertions(+), 1 deletion(-)

diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopySource.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopySource.java
index 074b86a..175d0b3 100644
--- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopySource.java
+++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopySource.java
@@ -209,8 +209,9 @@ public class CopySource extends AbstractSource<String, FileAwareInputStream> {
       Iterator<FileSet<CopyEntity>> prioritizedFileSets =
           allocator.allocateRequests(requestorIterator, copyConfiguration.getMaxToCopy());
 
-      //Submit alertable events for unfulfilled requests
+      //Submit alertable events for unfulfilled requests and fail if all of the allocated requests were rejected due to size
       submitUnfulfilledRequestEvents(allocator);
+      failJobIfAllRequestsRejected(allocator, prioritizedFileSets);
 
       String filesetWuGeneratorAlias = state.getProp(ConfigurationKeys.COPY_SOURCE_FILESET_WU_GENERATOR_CLASS, FileSetWorkUnitGenerator.class.getName());
       Iterator<Callable<Void>> callableIterator =
@@ -291,6 +292,20 @@ public class CopySource extends AbstractSource<String, FileAwareInputStream> {
     }
   }
 
+  void failJobIfAllRequestsRejected(RequestAllocator<FileSet<CopyEntity>> allocator,
+      Iterator<FileSet<CopyEntity>> allocatedRequests) throws IOException {
+    // TODO: we should set job as partial success if there is a mix of allocated requests and rejections
+    if (PriorityIterableBasedRequestAllocator.class.isAssignableFrom(allocator.getClass())) {
+      PriorityIterableBasedRequestAllocator<FileSet<CopyEntity>> priorityIterableBasedRequestAllocator =
+          (PriorityIterableBasedRequestAllocator<FileSet<CopyEntity>>) allocator;
+      // If there are no allocated items and are there items exceeding the available resources, then we can infer all items exceed resources
+      if (!allocatedRequests.hasNext() && priorityIterableBasedRequestAllocator.getRequestsExceedingAvailableResourcePool().size() > 0) {
+        throw new IOException(String.format("Requested copy datasets are all larger than the available resource pool. Try increasing %s and/or %s",
+            CopyConfiguration.MAX_COPY_PREFIX + "." + CopyResourcePool.ENTITIES_KEY, CopyConfiguration.MAX_COPY_PREFIX + ".size"));
+      }
+    }
+  }
+
   private void submitUnfulfilledRequestEvents(RequestAllocator<FileSet<CopyEntity>> allocator) {
     if (PriorityIterableBasedRequestAllocator.class.isAssignableFrom(allocator.getClass())) {
       PriorityIterableBasedRequestAllocator<FileSet<CopyEntity>> priorityIterableBasedRequestAllocator =
diff --git a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/CopySourceTest.java b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/CopySourceTest.java
index 102d6d1..68f683c 100644
--- a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/CopySourceTest.java
+++ b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/CopySourceTest.java
@@ -196,6 +196,114 @@ public class CopySourceTest {
     Assert.assertEquals(fileSet.getTotalSizeInBytes(), 75);
   }
 
+  @Test(expectedExceptions = IOException.class)
+  public void testFailIfAllAllocationRequestsRejected()
+      throws IOException, NoSuchMethodException, InvocationTargetException, IllegalAccessException {
+    SourceState state = new SourceState();
+
+    state.setProp(ConfigurationKeys.SOURCE_FILEBASED_FS_URI, "file:///");
+    state.setProp(ConfigurationKeys.WRITER_FILE_SYSTEM_URI, "file:///");
+    state.setProp(ConfigurationKeys.DATA_PUBLISHER_FINAL_DIR, "/target/dir");
+    state.setProp(DatasetUtils.DATASET_PROFILE_CLASS_KEY,
+        TestCopyablePartitionableDatasedFinder.class.getCanonicalName());
+    state.setProp(CopySource.MAX_CONCURRENT_LISTING_SERVICES, 2);
+    state.setProp(CopyConfiguration.MAX_COPY_PREFIX + ".size", "50");
+    state.setProp(CopyConfiguration.MAX_COPY_PREFIX + ".copyEntities", "2");
+    state.setProp(CopyConfiguration.STORE_REJECTED_REQUESTS_KEY,
+        RequestAllocatorConfig.StoreRejectedRequestsConfig.ALL.name().toLowerCase());
+    state.setProp(ConfigurationKeys.METRICS_CUSTOM_BUILDERS, "org.apache.gobblin.metrics.ConsoleEventReporterFactory");
+
+    CopySource source = new CopySource();
+
+    final FileSystem sourceFs = HadoopUtils.getSourceFileSystem(state);
+    final FileSystem targetFs = HadoopUtils.getWriterFileSystem(state, 1, 0);
+
+    int maxThreads = state
+        .getPropAsInt(CopySource.MAX_CONCURRENT_LISTING_SERVICES, CopySource.DEFAULT_MAX_CONCURRENT_LISTING_SERVICES);
+
+    final CopyConfiguration copyConfiguration = CopyConfiguration.builder(targetFs, state.getProperties()).build();
+
+    MetricContext metricContext = Instrumented.getMetricContext(state, CopySource.class);
+    EventSubmitter eventSubmitter = new EventSubmitter.Builder(metricContext, CopyConfiguration.COPY_PREFIX).build();
+    DatasetsFinder<CopyableDatasetBase> datasetFinder = DatasetUtils
+        .instantiateDatasetFinder(state.getProperties(), sourceFs, CopySource.DEFAULT_DATASET_PROFILE_CLASS_KEY,
+            eventSubmitter, state);
+
+    IterableDatasetFinder<CopyableDatasetBase> iterableDatasetFinder =
+        datasetFinder instanceof IterableDatasetFinder ? (IterableDatasetFinder<CopyableDatasetBase>) datasetFinder
+            : new IterableDatasetFinderImpl<>(datasetFinder);
+
+    Iterator<CopyableDatasetRequestor> requesterIteratorWithNulls = Iterators
+        .transform(iterableDatasetFinder.getDatasetsIterator(),
+            new CopyableDatasetRequestor.Factory(targetFs, copyConfiguration, log));
+    Iterator<CopyableDatasetRequestor> requesterIterator =
+        Iterators.filter(requesterIteratorWithNulls, Predicates.<CopyableDatasetRequestor>notNull());
+
+    Method m = CopySource.class.getDeclaredMethod("createRequestAllocator", CopyConfiguration.class, int.class);
+    m.setAccessible(true);
+    PriorityIterableBasedRequestAllocator<FileSet<CopyEntity>> allocator =
+        (PriorityIterableBasedRequestAllocator<FileSet<CopyEntity>>) m.invoke(source, copyConfiguration, maxThreads);
+    Iterator<FileSet<CopyEntity>> prioritizedFileSets =
+        allocator.allocateRequests(requesterIterator, copyConfiguration.getMaxToCopy());
+    List<FileSet<CopyEntity>> fileSetList = allocator.getRequestsExceedingAvailableResourcePool();
+    Assert.assertEquals(fileSetList.size(), 2);
+    source.failJobIfAllRequestsRejected(allocator, prioritizedFileSets);
+  }
+
+  @Test
+  public void testPassIfNoAllocationsRejected()
+      throws IOException, NoSuchMethodException, InvocationTargetException, IllegalAccessException {
+    SourceState state = new SourceState();
+
+    state.setProp(ConfigurationKeys.SOURCE_FILEBASED_FS_URI, "file:///");
+    state.setProp(ConfigurationKeys.WRITER_FILE_SYSTEM_URI, "file:///");
+    state.setProp(ConfigurationKeys.DATA_PUBLISHER_FINAL_DIR, "/target/dir");
+    state.setProp(DatasetUtils.DATASET_PROFILE_CLASS_KEY,
+        TestCopyablePartitionableDatasedFinder.class.getCanonicalName());
+    state.setProp(CopySource.MAX_CONCURRENT_LISTING_SERVICES, 2);
+    state.setProp(CopyConfiguration.MAX_COPY_PREFIX + ".size", "100");
+    state.setProp(CopyConfiguration.MAX_COPY_PREFIX + ".copyEntities", "10");
+    state.setProp(CopyConfiguration.STORE_REJECTED_REQUESTS_KEY,
+        RequestAllocatorConfig.StoreRejectedRequestsConfig.ALL.name().toLowerCase());
+    state.setProp(ConfigurationKeys.METRICS_CUSTOM_BUILDERS, "org.apache.gobblin.metrics.ConsoleEventReporterFactory");
+
+    CopySource source = new CopySource();
+
+    final FileSystem sourceFs = HadoopUtils.getSourceFileSystem(state);
+    final FileSystem targetFs = HadoopUtils.getWriterFileSystem(state, 1, 0);
+
+    int maxThreads = state
+        .getPropAsInt(CopySource.MAX_CONCURRENT_LISTING_SERVICES, CopySource.DEFAULT_MAX_CONCURRENT_LISTING_SERVICES);
+
+    final CopyConfiguration copyConfiguration = CopyConfiguration.builder(targetFs, state.getProperties()).build();
+
+    MetricContext metricContext = Instrumented.getMetricContext(state, CopySource.class);
+    EventSubmitter eventSubmitter = new EventSubmitter.Builder(metricContext, CopyConfiguration.COPY_PREFIX).build();
+    DatasetsFinder<CopyableDatasetBase> datasetFinder = DatasetUtils
+        .instantiateDatasetFinder(state.getProperties(), sourceFs, CopySource.DEFAULT_DATASET_PROFILE_CLASS_KEY,
+            eventSubmitter, state);
+
+    IterableDatasetFinder<CopyableDatasetBase> iterableDatasetFinder =
+        datasetFinder instanceof IterableDatasetFinder ? (IterableDatasetFinder<CopyableDatasetBase>) datasetFinder
+            : new IterableDatasetFinderImpl<>(datasetFinder);
+
+    Iterator<CopyableDatasetRequestor> requesterIteratorWithNulls = Iterators
+        .transform(iterableDatasetFinder.getDatasetsIterator(),
+            new CopyableDatasetRequestor.Factory(targetFs, copyConfiguration, log));
+    Iterator<CopyableDatasetRequestor> requesterIterator =
+        Iterators.filter(requesterIteratorWithNulls, Predicates.<CopyableDatasetRequestor>notNull());
+
+    Method m = CopySource.class.getDeclaredMethod("createRequestAllocator", CopyConfiguration.class, int.class);
+    m.setAccessible(true);
+    PriorityIterableBasedRequestAllocator<FileSet<CopyEntity>> allocator =
+        (PriorityIterableBasedRequestAllocator<FileSet<CopyEntity>>) m.invoke(source, copyConfiguration, maxThreads);
+    Iterator<FileSet<CopyEntity>> prioritizedFileSets =
+        allocator.allocateRequests(requesterIterator, copyConfiguration.getMaxToCopy());
+    List<FileSet<CopyEntity>> fileSetList = allocator.getRequestsExceedingAvailableResourcePool();
+    Assert.assertEquals(fileSetList.size(), 0);
+    source.failJobIfAllRequestsRejected(allocator, prioritizedFileSets);
+  }
+
   @Test
   public void testDefaultHiveDatasetShardTempPaths()
       throws IOException, NoSuchMethodException, InvocationTargetException, IllegalAccessException {