You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by zi...@apache.org on 2022/02/10 19:03:15 UTC
[gobblin] branch master updated: [GOBBLIN-1604] Throw exception if there are no allocated requests due to lack of res… (#3461)
This is an automated email from the ASF dual-hosted git repository.
zihanli58 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 2804f10 [GOBBLIN-1604] Throw exception if there are no allocated requests due to lack of res… (#3461)
2804f10 is described below
commit 2804f10124241f85f54e51db513ff6831dd408b9
Author: William Lo <lo...@gmail.com>
AuthorDate: Thu Feb 10 11:03:08 2022 -0800
[GOBBLIN-1604] Throw exception if there are no allocated requests due to lack of res… (#3461)
* Throw exception if there are no allocated requests due to lack of resources
* Fix typo
---
.../gobblin/data/management/copy/CopySource.java | 17 +++-
.../data/management/copy/CopySourceTest.java | 108 +++++++++++++++++++++
2 files changed, 124 insertions(+), 1 deletion(-)
diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopySource.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopySource.java
index 074b86a..175d0b3 100644
--- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopySource.java
+++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopySource.java
@@ -209,8 +209,9 @@ public class CopySource extends AbstractSource<String, FileAwareInputStream> {
Iterator<FileSet<CopyEntity>> prioritizedFileSets =
allocator.allocateRequests(requestorIterator, copyConfiguration.getMaxToCopy());
- //Submit alertable events for unfulfilled requests
+ //Submit alertable events for unfulfilled requests and fail if all of the allocated requests were rejected due to size
submitUnfulfilledRequestEvents(allocator);
+ failJobIfAllRequestsRejected(allocator, prioritizedFileSets);
String filesetWuGeneratorAlias = state.getProp(ConfigurationKeys.COPY_SOURCE_FILESET_WU_GENERATOR_CLASS, FileSetWorkUnitGenerator.class.getName());
Iterator<Callable<Void>> callableIterator =
@@ -291,6 +292,20 @@ public class CopySource extends AbstractSource<String, FileAwareInputStream> {
}
}
+ void failJobIfAllRequestsRejected(RequestAllocator<FileSet<CopyEntity>> allocator,
+ Iterator<FileSet<CopyEntity>> allocatedRequests) throws IOException {
+ // TODO: we should set job as partial success if there is a mix of allocated requests and rejections
+ if (PriorityIterableBasedRequestAllocator.class.isAssignableFrom(allocator.getClass())) {
+ PriorityIterableBasedRequestAllocator<FileSet<CopyEntity>> priorityIterableBasedRequestAllocator =
+ (PriorityIterableBasedRequestAllocator<FileSet<CopyEntity>>) allocator;
+ // If there are no allocated items and are there items exceeding the available resources, then we can infer all items exceed resources
+ if (!allocatedRequests.hasNext() && priorityIterableBasedRequestAllocator.getRequestsExceedingAvailableResourcePool().size() > 0) {
+ throw new IOException(String.format("Requested copy datasets are all larger than the available resource pool. Try increasing %s and/or %s",
+ CopyConfiguration.MAX_COPY_PREFIX + "." + CopyResourcePool.ENTITIES_KEY, CopyConfiguration.MAX_COPY_PREFIX + ".size"));
+ }
+ }
+ }
+
private void submitUnfulfilledRequestEvents(RequestAllocator<FileSet<CopyEntity>> allocator) {
if (PriorityIterableBasedRequestAllocator.class.isAssignableFrom(allocator.getClass())) {
PriorityIterableBasedRequestAllocator<FileSet<CopyEntity>> priorityIterableBasedRequestAllocator =
diff --git a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/CopySourceTest.java b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/CopySourceTest.java
index 102d6d1..68f683c 100644
--- a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/CopySourceTest.java
+++ b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/CopySourceTest.java
@@ -196,6 +196,114 @@ public class CopySourceTest {
Assert.assertEquals(fileSet.getTotalSizeInBytes(), 75);
}
+ @Test(expectedExceptions = IOException.class)
+ public void testFailIfAllAllocationRequestsRejected()
+ throws IOException, NoSuchMethodException, InvocationTargetException, IllegalAccessException {
+ SourceState state = new SourceState();
+
+ state.setProp(ConfigurationKeys.SOURCE_FILEBASED_FS_URI, "file:///");
+ state.setProp(ConfigurationKeys.WRITER_FILE_SYSTEM_URI, "file:///");
+ state.setProp(ConfigurationKeys.DATA_PUBLISHER_FINAL_DIR, "/target/dir");
+ state.setProp(DatasetUtils.DATASET_PROFILE_CLASS_KEY,
+ TestCopyablePartitionableDatasedFinder.class.getCanonicalName());
+ state.setProp(CopySource.MAX_CONCURRENT_LISTING_SERVICES, 2);
+ state.setProp(CopyConfiguration.MAX_COPY_PREFIX + ".size", "50");
+ state.setProp(CopyConfiguration.MAX_COPY_PREFIX + ".copyEntities", "2");
+ state.setProp(CopyConfiguration.STORE_REJECTED_REQUESTS_KEY,
+ RequestAllocatorConfig.StoreRejectedRequestsConfig.ALL.name().toLowerCase());
+ state.setProp(ConfigurationKeys.METRICS_CUSTOM_BUILDERS, "org.apache.gobblin.metrics.ConsoleEventReporterFactory");
+
+ CopySource source = new CopySource();
+
+ final FileSystem sourceFs = HadoopUtils.getSourceFileSystem(state);
+ final FileSystem targetFs = HadoopUtils.getWriterFileSystem(state, 1, 0);
+
+ int maxThreads = state
+ .getPropAsInt(CopySource.MAX_CONCURRENT_LISTING_SERVICES, CopySource.DEFAULT_MAX_CONCURRENT_LISTING_SERVICES);
+
+ final CopyConfiguration copyConfiguration = CopyConfiguration.builder(targetFs, state.getProperties()).build();
+
+ MetricContext metricContext = Instrumented.getMetricContext(state, CopySource.class);
+ EventSubmitter eventSubmitter = new EventSubmitter.Builder(metricContext, CopyConfiguration.COPY_PREFIX).build();
+ DatasetsFinder<CopyableDatasetBase> datasetFinder = DatasetUtils
+ .instantiateDatasetFinder(state.getProperties(), sourceFs, CopySource.DEFAULT_DATASET_PROFILE_CLASS_KEY,
+ eventSubmitter, state);
+
+ IterableDatasetFinder<CopyableDatasetBase> iterableDatasetFinder =
+ datasetFinder instanceof IterableDatasetFinder ? (IterableDatasetFinder<CopyableDatasetBase>) datasetFinder
+ : new IterableDatasetFinderImpl<>(datasetFinder);
+
+ Iterator<CopyableDatasetRequestor> requesterIteratorWithNulls = Iterators
+ .transform(iterableDatasetFinder.getDatasetsIterator(),
+ new CopyableDatasetRequestor.Factory(targetFs, copyConfiguration, log));
+ Iterator<CopyableDatasetRequestor> requesterIterator =
+ Iterators.filter(requesterIteratorWithNulls, Predicates.<CopyableDatasetRequestor>notNull());
+
+ Method m = CopySource.class.getDeclaredMethod("createRequestAllocator", CopyConfiguration.class, int.class);
+ m.setAccessible(true);
+ PriorityIterableBasedRequestAllocator<FileSet<CopyEntity>> allocator =
+ (PriorityIterableBasedRequestAllocator<FileSet<CopyEntity>>) m.invoke(source, copyConfiguration, maxThreads);
+ Iterator<FileSet<CopyEntity>> prioritizedFileSets =
+ allocator.allocateRequests(requesterIterator, copyConfiguration.getMaxToCopy());
+ List<FileSet<CopyEntity>> fileSetList = allocator.getRequestsExceedingAvailableResourcePool();
+ Assert.assertEquals(fileSetList.size(), 2);
+ source.failJobIfAllRequestsRejected(allocator, prioritizedFileSets);
+ }
+
+ @Test
+ public void testPassIfNoAllocationsRejected()
+ throws IOException, NoSuchMethodException, InvocationTargetException, IllegalAccessException {
+ SourceState state = new SourceState();
+
+ state.setProp(ConfigurationKeys.SOURCE_FILEBASED_FS_URI, "file:///");
+ state.setProp(ConfigurationKeys.WRITER_FILE_SYSTEM_URI, "file:///");
+ state.setProp(ConfigurationKeys.DATA_PUBLISHER_FINAL_DIR, "/target/dir");
+ state.setProp(DatasetUtils.DATASET_PROFILE_CLASS_KEY,
+ TestCopyablePartitionableDatasedFinder.class.getCanonicalName());
+ state.setProp(CopySource.MAX_CONCURRENT_LISTING_SERVICES, 2);
+ state.setProp(CopyConfiguration.MAX_COPY_PREFIX + ".size", "100");
+ state.setProp(CopyConfiguration.MAX_COPY_PREFIX + ".copyEntities", "10");
+ state.setProp(CopyConfiguration.STORE_REJECTED_REQUESTS_KEY,
+ RequestAllocatorConfig.StoreRejectedRequestsConfig.ALL.name().toLowerCase());
+ state.setProp(ConfigurationKeys.METRICS_CUSTOM_BUILDERS, "org.apache.gobblin.metrics.ConsoleEventReporterFactory");
+
+ CopySource source = new CopySource();
+
+ final FileSystem sourceFs = HadoopUtils.getSourceFileSystem(state);
+ final FileSystem targetFs = HadoopUtils.getWriterFileSystem(state, 1, 0);
+
+ int maxThreads = state
+ .getPropAsInt(CopySource.MAX_CONCURRENT_LISTING_SERVICES, CopySource.DEFAULT_MAX_CONCURRENT_LISTING_SERVICES);
+
+ final CopyConfiguration copyConfiguration = CopyConfiguration.builder(targetFs, state.getProperties()).build();
+
+ MetricContext metricContext = Instrumented.getMetricContext(state, CopySource.class);
+ EventSubmitter eventSubmitter = new EventSubmitter.Builder(metricContext, CopyConfiguration.COPY_PREFIX).build();
+ DatasetsFinder<CopyableDatasetBase> datasetFinder = DatasetUtils
+ .instantiateDatasetFinder(state.getProperties(), sourceFs, CopySource.DEFAULT_DATASET_PROFILE_CLASS_KEY,
+ eventSubmitter, state);
+
+ IterableDatasetFinder<CopyableDatasetBase> iterableDatasetFinder =
+ datasetFinder instanceof IterableDatasetFinder ? (IterableDatasetFinder<CopyableDatasetBase>) datasetFinder
+ : new IterableDatasetFinderImpl<>(datasetFinder);
+
+ Iterator<CopyableDatasetRequestor> requesterIteratorWithNulls = Iterators
+ .transform(iterableDatasetFinder.getDatasetsIterator(),
+ new CopyableDatasetRequestor.Factory(targetFs, copyConfiguration, log));
+ Iterator<CopyableDatasetRequestor> requesterIterator =
+ Iterators.filter(requesterIteratorWithNulls, Predicates.<CopyableDatasetRequestor>notNull());
+
+ Method m = CopySource.class.getDeclaredMethod("createRequestAllocator", CopyConfiguration.class, int.class);
+ m.setAccessible(true);
+ PriorityIterableBasedRequestAllocator<FileSet<CopyEntity>> allocator =
+ (PriorityIterableBasedRequestAllocator<FileSet<CopyEntity>>) m.invoke(source, copyConfiguration, maxThreads);
+ Iterator<FileSet<CopyEntity>> prioritizedFileSets =
+ allocator.allocateRequests(requesterIterator, copyConfiguration.getMaxToCopy());
+ List<FileSet<CopyEntity>> fileSetList = allocator.getRequestsExceedingAvailableResourcePool();
+ Assert.assertEquals(fileSetList.size(), 0);
+ source.failJobIfAllRequestsRejected(allocator, prioritizedFileSets);
+ }
+
@Test
public void testDefaultHiveDatasetShardTempPaths()
throws IOException, NoSuchMethodException, InvocationTargetException, IllegalAccessException {