You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by ab...@apache.org on 2018/03/21 08:30:44 UTC
[26/50] incubator-gobblin git commit: [GOBBLIN-379] Submit an event
when DistCp job resource requirements exceed a hard bound
[GOBBLIN-379] Submit an event when DistCp job resource requirements exceed a hard bound
Closes #2257 from sv2000/gobblin-379
Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/457ede26
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/457ede26
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/457ede26
Branch: refs/heads/0.12.0
Commit: 457ede26da2c7f693d4fe321d4c50b0e25e0d22d
Parents: bde5bb1
Author: suvasude <su...@linkedin.biz>
Authored: Thu Feb 8 11:37:21 2018 -0800
Committer: Hung Tran <hu...@linkedin.com>
Committed: Thu Feb 8 11:37:21 2018 -0800
----------------------------------------------------------------------
.../data/management/copy/CopyConfiguration.java | 17 ++-
.../data/management/copy/CopySource.java | 105 ++++++++++++++-----
.../data/management/copy/CopySourceTest.java | 84 +++++++++++++++
.../ConcurrentBoundedPriorityIterable.java | 80 +++++++++-----
.../PriorityIterableBasedRequestAllocator.java | 71 +++++++++----
.../RequestAllocatorConfig.java | 21 +++-
.../ConcurrentBoundedPriorityIterableTest.java | 21 ++--
7 files changed, 309 insertions(+), 90 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/457ede26/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopyConfiguration.java
----------------------------------------------------------------------
diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopyConfiguration.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopyConfiguration.java
index 211ad13..c4d07e2 100644
--- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopyConfiguration.java
+++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopyConfiguration.java
@@ -23,6 +23,8 @@ import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
+import org.apache.gobblin.util.request_allocation.ConcurrentBoundedPriorityIterable;
+import org.apache.gobblin.util.request_allocation.RequestAllocatorConfig;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -65,6 +67,13 @@ public class CopyConfiguration {
public static final String ABORT_ON_SINGLE_DATASET_FAILURE = COPY_PREFIX + ".abortOnSingleDatasetFailure";
+ /*
+ * Config to store different classes of rejected requests. Possible values are "all","none", or "min" (default).
+ */
+ public static final String STORE_REJECTED_REQUESTS_KEY = COPY_PREFIX + ".store.rejected.requests";
+ public static final String DEFAULT_STORE_REJECTED_REQUESTS =
+ RequestAllocatorConfig.StoreRejectedRequestsConfig.MIN.name();
+
/**
* User supplied directory where files should be published. This value is identical for all datasets in the distcp job.
*/
@@ -81,6 +90,7 @@ public class CopyConfiguration {
private final FileSystem targetFs;
private final Optional<FileSetComparator> prioritizer;
private final ResourcePool maxToCopy;
+ private final String storeRejectedRequestsSetting;
private final Config config;
@@ -114,8 +124,8 @@ public class CopyConfiguration {
if (properties.containsKey(PRIORITIZER_ALIAS_KEY)) {
try {
this.prioritizer = Optional.of(GobblinConstructorUtils.<FileSetComparator>invokeLongestConstructor(
- new ClassAliasResolver(FileSetComparator.class).resolveClass(
- properties.getProperty(PRIORITIZER_ALIAS_KEY)), properties));
+ new ClassAliasResolver(FileSetComparator.class)
+ .resolveClass(properties.getProperty(PRIORITIZER_ALIAS_KEY)), properties));
} catch (ReflectiveOperationException roe) {
throw new RuntimeException("Could not build prioritizer.", roe);
}
@@ -124,6 +134,9 @@ public class CopyConfiguration {
}
this.maxToCopy = CopyResourcePool.fromConfig(ConfigUtils.getConfigOrEmpty(this.config, MAX_COPY_PREFIX));
+ this.storeRejectedRequestsSetting =
+ properties.getProperty(CopyConfiguration.STORE_REJECTED_REQUESTS_KEY, DEFAULT_STORE_REJECTED_REQUESTS);
+
this.abortOnSingleDatasetFailure = false;
if (this.config.hasPath(ABORT_ON_SINGLE_DATASET_FAILURE)) {
this.abortOnSingleDatasetFailure = this.config.getBoolean(ABORT_ON_SINGLE_DATASET_FAILURE);
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/457ede26/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopySource.java
----------------------------------------------------------------------
diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopySource.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopySource.java
index 615d6ad..3355f3d 100644
--- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopySource.java
+++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopySource.java
@@ -29,6 +29,9 @@ import java.util.concurrent.Future;
import javax.annotation.Nullable;
+import lombok.AllArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
@@ -37,6 +40,7 @@ import com.google.common.base.Optional;
import com.google.common.base.Predicates;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
import com.google.common.collect.Multimaps;
@@ -62,6 +66,7 @@ import org.apache.gobblin.dataset.IterableDatasetFinder;
import org.apache.gobblin.dataset.IterableDatasetFinderImpl;
import org.apache.gobblin.instrumented.Instrumented;
import org.apache.gobblin.metrics.GobblinMetrics;
+import org.apache.gobblin.metrics.GobblinTrackingEvent;
import org.apache.gobblin.metrics.MetricContext;
import org.apache.gobblin.metrics.Tag;
import org.apache.gobblin.metrics.event.EventSubmitter;
@@ -84,13 +89,11 @@ import org.apache.gobblin.util.guid.Guid;
import org.apache.gobblin.util.request_allocation.GreedyAllocator;
import org.apache.gobblin.util.request_allocation.HierarchicalAllocator;
import org.apache.gobblin.util.request_allocation.HierarchicalPrioritizer;
+import org.apache.gobblin.util.request_allocation.PriorityIterableBasedRequestAllocator;
import org.apache.gobblin.util.request_allocation.RequestAllocator;
import org.apache.gobblin.util.request_allocation.RequestAllocatorConfig;
import org.apache.gobblin.util.request_allocation.RequestAllocatorUtils;
-import lombok.AllArgsConstructor;
-import lombok.extern.slf4j.Slf4j;
-
/**
* {@link org.apache.gobblin.source.Source} that generates work units from {@link org.apache.gobblin.data.management.copy.CopyableDataset}s.
@@ -112,11 +115,21 @@ public class CopySource extends AbstractSource<String, FileAwareInputStream> {
public static final String SIMULATE = CopyConfiguration.COPY_PREFIX + ".simulate";
public static final String MAX_SIZE_MULTI_WORKUNITS = CopyConfiguration.COPY_PREFIX + ".binPacking.maxSizePerBin";
public static final String MAX_WORK_UNITS_PER_BIN = CopyConfiguration.COPY_PREFIX + ".binPacking.maxWorkUnitsPerBin";
+ public static final String REQUESTS_EXCEEDING_AVAILABLE_RESOURCE_POOL_EVENT_NAME =
+ "RequestsExceedingAvailableResourcePoolEvent";
+ public static final String REQUESTS_DROPPED_EVENT_NAME = "RequestsDroppedEvent";
+ public static final String REQUESTS_REJECTED_DUE_TO_INSUFFICIENT_EVICTION_EVENT_NAME =
+ "RequestsRejectedDueToInsufficientEvictionEvent";
+ public static final String REQUESTS_REJECTED_WITH_LOW_PRIORITY_EVENT_NAME = "RequestsRejectedWithLowPriorityEvent";
+ public static final String FILESET_NAME = "fileset.name";
+ public static final String FILESET_TOTAL_ENTITIES = "fileset.total.entities";
+ public static final String FILESET_TOTAL_SIZE_IN_BYTES = "fileset.total.size";
private static final String WORK_UNIT_WEIGHT = CopyConfiguration.COPY_PREFIX + ".workUnitWeight";
private final WorkUnitWeighter weighter = new FieldWeighter(WORK_UNIT_WEIGHT);
public MetricContext metricContext;
+ public EventSubmitter eventSubmitter;
protected Optional<LineageInfo> lineageInfo;
@@ -145,16 +158,17 @@ public class CopySource extends AbstractSource<String, FileAwareInputStream> {
try {
- DeprecationUtils.renameDeprecatedKeys(state, CopyConfiguration.MAX_COPY_PREFIX + "." + CopyResourcePool.ENTITIES_KEY,
- Lists.newArrayList(MAX_FILES_COPIED_KEY));
+ DeprecationUtils
+ .renameDeprecatedKeys(state, CopyConfiguration.MAX_COPY_PREFIX + "." + CopyResourcePool.ENTITIES_KEY,
+ Lists.newArrayList(MAX_FILES_COPIED_KEY));
final FileSystem sourceFs = HadoopUtils.getSourceFileSystem(state);
final FileSystem targetFs = HadoopUtils.getWriterFileSystem(state, 1, 0);
state.setProp(SlaEventKeys.SOURCE_URI, sourceFs.getUri());
state.setProp(SlaEventKeys.DESTINATION_URI, targetFs.getUri());
- log.info("Identified source file system at {} and target file system at {}.",
- sourceFs.getUri(), targetFs.getUri());
+ log.info("Identified source file system at {} and target file system at {}.", sourceFs.getUri(),
+ targetFs.getUri());
long maxSizePerBin = state.getPropAsLong(MAX_SIZE_MULTI_WORKUNITS, 0);
long maxWorkUnitsPerMultiWorkUnit = state.getPropAsLong(MAX_WORK_UNITS_PER_BIN, 50);
@@ -165,26 +179,31 @@ public class CopySource extends AbstractSource<String, FileAwareInputStream> {
final CopyConfiguration copyConfiguration = CopyConfiguration.builder(targetFs, state.getProperties()).build();
+ this.eventSubmitter = new EventSubmitter.Builder(this.metricContext, CopyConfiguration.COPY_PREFIX).build();
DatasetsFinder<CopyableDatasetBase> datasetFinder = DatasetUtils
.instantiateDatasetFinder(state.getProperties(), sourceFs, DEFAULT_DATASET_PROFILE_CLASS_KEY,
- new EventSubmitter.Builder(this.metricContext, CopyConfiguration.COPY_PREFIX).build(), state);
+ this.eventSubmitter, state);
IterableDatasetFinder<CopyableDatasetBase> iterableDatasetFinder =
datasetFinder instanceof IterableDatasetFinder ? (IterableDatasetFinder<CopyableDatasetBase>) datasetFinder
: new IterableDatasetFinderImpl<>(datasetFinder);
- Iterator<CopyableDatasetRequestor> requestorIteratorWithNulls =
- Iterators.transform(iterableDatasetFinder.getDatasetsIterator(),
+ Iterator<CopyableDatasetRequestor> requestorIteratorWithNulls = Iterators
+ .transform(iterableDatasetFinder.getDatasetsIterator(),
new CopyableDatasetRequestor.Factory(targetFs, copyConfiguration, log));
- Iterator<CopyableDatasetRequestor> requestorIterator = Iterators.filter(requestorIteratorWithNulls,
- Predicates.<CopyableDatasetRequestor>notNull());
+ Iterator<CopyableDatasetRequestor> requestorIterator =
+ Iterators.filter(requestorIteratorWithNulls, Predicates.<CopyableDatasetRequestor>notNull());
final SetMultimap<FileSet<CopyEntity>, WorkUnit> workUnitsMap =
Multimaps.<FileSet<CopyEntity>, WorkUnit>synchronizedSetMultimap(
HashMultimap.<FileSet<CopyEntity>, WorkUnit>create());
RequestAllocator<FileSet<CopyEntity>> allocator = createRequestAllocator(copyConfiguration, maxThreads);
- Iterator<FileSet<CopyEntity>> prioritizedFileSets = allocator.allocateRequests(requestorIterator, copyConfiguration.getMaxToCopy());
+ Iterator<FileSet<CopyEntity>> prioritizedFileSets =
+ allocator.allocateRequests(requestorIterator, copyConfiguration.getMaxToCopy());
+
+ //Submit alertable events for unfulfilled requests
+ submitUnfulfilledRequestEvents(allocator);
Iterator<Callable<Void>> callableIterator =
Iterators.transform(prioritizedFileSets, new Function<FileSet<CopyEntity>, Callable<Void>>() {
@@ -197,8 +216,7 @@ public class CopySource extends AbstractSource<String, FileAwareInputStream> {
});
try {
- List<Future<Void>> futures = new IteratorExecutor<>(callableIterator,
- maxThreads,
+ List<Future<Void>> futures = new IteratorExecutor<>(callableIterator, maxThreads,
ExecutorsUtils.newDaemonThreadFactory(Optional.of(log), Optional.of("Copy-file-listing-pool-%d")))
.execute();
@@ -231,8 +249,8 @@ public class CopySource extends AbstractSource<String, FileAwareInputStream> {
return Lists.newArrayList();
}
- List<? extends WorkUnit> workUnits =
- new WorstFitDecreasingBinPacking(maxSizePerBin).pack(Lists.newArrayList(workUnitsMap.values()), this.weighter);
+ List<? extends WorkUnit> workUnits = new WorstFitDecreasingBinPacking(maxSizePerBin)
+ .pack(Lists.newArrayList(workUnitsMap.values()), this.weighter);
log.info(String.format(
"Bin packed work units. Initial work units: %d, packed work units: %d, max weight per bin: %d, "
+ "max work units per bin: %d.", workUnitsMap.size(), workUnits.size(), maxSizePerBin,
@@ -243,11 +261,42 @@ public class CopySource extends AbstractSource<String, FileAwareInputStream> {
}
}
- private RequestAllocator<FileSet<CopyEntity>> createRequestAllocator(CopyConfiguration copyConfiguration, int maxThreads) {
- Optional<FileSetComparator> prioritizer = copyConfiguration.getPrioritizer();
+ private void submitUnfulfilledRequestEventsHelper(List<FileSet<CopyEntity>> fileSetList, String eventName) {
+ for (FileSet<CopyEntity> fileSet : fileSetList) {
+ GobblinTrackingEvent event =
+ GobblinTrackingEvent.newBuilder().setName(eventName).setNamespace(CopySource.class.getName()).setMetadata(
+ ImmutableMap.<String, String>builder()
+ .put(ConfigurationKeys.DATASET_URN_KEY, fileSet.getDataset().getUrn())
+ .put(FILESET_TOTAL_ENTITIES, Integer.toString(fileSet.getTotalEntities()))
+ .put(FILESET_TOTAL_SIZE_IN_BYTES, Long.toString(fileSet.getTotalSizeInBytes()))
+ .put(FILESET_NAME, fileSet.getName()).build()).build();
+ this.metricContext.submitEvent(event);
+ }
+ }
+ private void submitUnfulfilledRequestEvents(RequestAllocator<FileSet<CopyEntity>> allocator) {
+ if (PriorityIterableBasedRequestAllocator.class.isAssignableFrom(allocator.getClass())) {
+ PriorityIterableBasedRequestAllocator<FileSet<CopyEntity>> priorityIterableBasedRequestAllocator =
+ (PriorityIterableBasedRequestAllocator<FileSet<CopyEntity>>) allocator;
+ submitUnfulfilledRequestEventsHelper(
+ priorityIterableBasedRequestAllocator.getRequestsExceedingAvailableResourcePool(),
+ REQUESTS_EXCEEDING_AVAILABLE_RESOURCE_POOL_EVENT_NAME);
+ submitUnfulfilledRequestEventsHelper(
+ priorityIterableBasedRequestAllocator.getRequestsRejectedDueToInsufficientEviction(),
+ REQUESTS_REJECTED_DUE_TO_INSUFFICIENT_EVICTION_EVENT_NAME);
+ submitUnfulfilledRequestEventsHelper(priorityIterableBasedRequestAllocator.getRequestsRejectedWithLowPriority(),
+ REQUESTS_REJECTED_WITH_LOW_PRIORITY_EVENT_NAME);
+ submitUnfulfilledRequestEventsHelper(priorityIterableBasedRequestAllocator.getRequestsDropped(),
+ REQUESTS_DROPPED_EVENT_NAME);
+ }
+ }
+
+ private RequestAllocator<FileSet<CopyEntity>> createRequestAllocator(CopyConfiguration copyConfiguration,
+ int maxThreads) {
+ Optional<FileSetComparator> prioritizer = copyConfiguration.getPrioritizer();
RequestAllocatorConfig.Builder<FileSet<CopyEntity>> configBuilder =
RequestAllocatorConfig.builder(new FileSetResourceEstimator()).allowParallelization(maxThreads)
+ .storeRejectedRequests(copyConfiguration.getStoreRejectedRequestsSetting())
.withLimitedScopeConfig(copyConfiguration.getPrioritizationConfig());
if (!prioritizer.isPresent()) {
@@ -323,9 +372,8 @@ public class CopySource extends AbstractSource<String, FileAwareInputStream> {
* a DatasetFinder. Consequently, the source and destination dataset for the CopyableFile lineage are expected
* to be set by the same logic
*/
- if (lineageInfo.isPresent() &&
- copyableFile.getSourceDataset() != null &&
- copyableFile.getDestinationDataset() != null) {
+ if (lineageInfo.isPresent() && copyableFile.getSourceDataset() != null
+ && copyableFile.getDestinationDataset() != null) {
lineageInfo.get().setSource(copyableFile.getSourceDataset(), workUnit);
}
}
@@ -350,7 +398,8 @@ public class CopySource extends AbstractSource<String, FileAwareInputStream> {
return new EmptyExtractor<>("empty");
}
- protected Extractor<String, FileAwareInputStream> extractorForCopyableFile(FileSystem fs, CopyableFile cf, WorkUnitState state)
+ protected Extractor<String, FileAwareInputStream> extractorForCopyableFile(FileSystem fs, CopyableFile cf,
+ WorkUnitState state)
throws IOException {
return new FileAwareInputStreamExtractor(fs, cf, state);
}
@@ -365,7 +414,8 @@ public class CopySource extends AbstractSource<String, FileAwareInputStream> {
@Deprecated
protected FileSystem getSourceFileSystem(State state)
throws IOException {
- Configuration conf = HadoopUtils.getConfFromState(state, Optional.of(ConfigurationKeys.SOURCE_FILEBASED_ENCRYPTED_CONFIG_PATH));
+ Configuration conf =
+ HadoopUtils.getConfFromState(state, Optional.of(ConfigurationKeys.SOURCE_FILEBASED_ENCRYPTED_CONFIG_PATH));
String uri = state.getProp(ConfigurationKeys.SOURCE_FILEBASED_FS_URI, ConfigurationKeys.LOCAL_FS_URI);
return HadoopUtils.getOptionallyThrottledFileSystem(FileSystem.get(URI.create(uri), conf), state);
}
@@ -456,11 +506,12 @@ public class CopySource extends AbstractSource<String, FileAwareInputStream> {
return CopyableDatasetMetadata.deserialize(state.getProp(SERIALIZED_COPYABLE_DATASET));
}
- private void setWorkUnitWatermark(WorkUnit workUnit, Optional<CopyableFileWatermarkGenerator> watermarkGenerator, CopyEntity copyEntity)
+ private void setWorkUnitWatermark(WorkUnit workUnit, Optional<CopyableFileWatermarkGenerator> watermarkGenerator,
+ CopyEntity copyEntity)
throws IOException {
- if (copyEntity instanceof CopyableFile) {
+ if (copyEntity instanceof CopyableFile) {
Optional<WatermarkInterval> watermarkIntervalOptional =
- CopyableFileWatermarkHelper.getCopyableFileWatermark((CopyableFile)copyEntity, watermarkGenerator);
+ CopyableFileWatermarkHelper.getCopyableFileWatermark((CopyableFile) copyEntity, watermarkGenerator);
if (watermarkIntervalOptional.isPresent()) {
workUnit.setWatermarkInterval(watermarkIntervalOptional.get());
}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/457ede26/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/CopySourceTest.java
----------------------------------------------------------------------
diff --git a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/CopySourceTest.java b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/CopySourceTest.java
index d9e5368..377b2cf 100644
--- a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/CopySourceTest.java
+++ b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/CopySourceTest.java
@@ -17,19 +17,41 @@
package org.apache.gobblin.data.management.copy;
+import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.util.Iterator;
import java.util.List;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.util.request_allocation.RequestAllocatorConfig;
+import org.apache.hadoop.fs.FileSystem;
import org.testng.Assert;
import org.testng.annotations.Test;
+import com.google.common.base.Predicates;
+import com.google.common.collect.Iterators;
+
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.configuration.SourceState;
import org.apache.gobblin.data.management.dataset.DatasetUtils;
+import org.apache.gobblin.data.management.partition.CopyableDatasetRequestor;
+import org.apache.gobblin.data.management.partition.FileSet;
+import org.apache.gobblin.dataset.DatasetsFinder;
+import org.apache.gobblin.dataset.IterableDatasetFinder;
+import org.apache.gobblin.dataset.IterableDatasetFinderImpl;
+import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.metrics.event.EventSubmitter;
import org.apache.gobblin.source.workunit.Extract;
import org.apache.gobblin.source.workunit.WorkUnit;
+import org.apache.gobblin.util.HadoopUtils;
import org.apache.gobblin.util.JobLauncherUtils;
+import org.apache.gobblin.util.request_allocation.PriorityIterableBasedRequestAllocator;
+@Slf4j
public class CopySourceTest {
@Test
@@ -106,4 +128,66 @@ public class CopySourceTest {
Assert.assertNotNull(extractBelow);
}
+ @Test
+ public void testSubmitUnfulfilledRequestEvents()
+ throws IOException, NoSuchMethodException, InvocationTargetException, IllegalAccessException {
+ SourceState state = new SourceState();
+
+ state.setProp(ConfigurationKeys.SOURCE_FILEBASED_FS_URI, "file:///");
+ state.setProp(ConfigurationKeys.WRITER_FILE_SYSTEM_URI, "file:///");
+ state.setProp(ConfigurationKeys.DATA_PUBLISHER_FINAL_DIR, "/target/dir");
+ state.setProp(DatasetUtils.DATASET_PROFILE_CLASS_KEY,
+ TestCopyablePartitionableDatasedFinder.class.getCanonicalName());
+ state.setProp(CopySource.MAX_CONCURRENT_LISTING_SERVICES, 2);
+ state.setProp(CopyConfiguration.MAX_COPY_PREFIX + ".size", "50");
+ state.setProp(CopyConfiguration.MAX_COPY_PREFIX + ".copyEntities", 2);
+ state.setProp(CopyConfiguration.STORE_REJECTED_REQUESTS_KEY,
+ RequestAllocatorConfig.StoreRejectedRequestsConfig.ALL.name().toLowerCase());
+ state.setProp(ConfigurationKeys.METRICS_CUSTOM_BUILDERS, "org.apache.gobblin.metrics.ConsoleEventReporterFactory");
+
+ CopySource source = new CopySource();
+
+ final FileSystem sourceFs = HadoopUtils.getSourceFileSystem(state);
+ final FileSystem targetFs = HadoopUtils.getWriterFileSystem(state, 1, 0);
+
+ int maxThreads = state
+ .getPropAsInt(CopySource.MAX_CONCURRENT_LISTING_SERVICES, CopySource.DEFAULT_MAX_CONCURRENT_LISTING_SERVICES);
+
+ final CopyConfiguration copyConfiguration = CopyConfiguration.builder(targetFs, state.getProperties()).build();
+
+ MetricContext metricContext = Instrumented.getMetricContext(state, CopySource.class);
+ EventSubmitter eventSubmitter = new EventSubmitter.Builder(metricContext, CopyConfiguration.COPY_PREFIX).build();
+ DatasetsFinder<CopyableDatasetBase> datasetFinder = DatasetUtils
+ .instantiateDatasetFinder(state.getProperties(), sourceFs, CopySource.DEFAULT_DATASET_PROFILE_CLASS_KEY,
+ eventSubmitter, state);
+
+ IterableDatasetFinder<CopyableDatasetBase> iterableDatasetFinder =
+ datasetFinder instanceof IterableDatasetFinder ? (IterableDatasetFinder<CopyableDatasetBase>) datasetFinder
+ : new IterableDatasetFinderImpl<>(datasetFinder);
+
+ Iterator<CopyableDatasetRequestor> requestorIteratorWithNulls = Iterators
+ .transform(iterableDatasetFinder.getDatasetsIterator(),
+ new CopyableDatasetRequestor.Factory(targetFs, copyConfiguration, log));
+ Iterator<CopyableDatasetRequestor> requestorIterator =
+ Iterators.filter(requestorIteratorWithNulls, Predicates.<CopyableDatasetRequestor>notNull());
+
+ Method m = CopySource.class.getDeclaredMethod("createRequestAllocator", CopyConfiguration.class, int.class);
+ m.setAccessible(true);
+ PriorityIterableBasedRequestAllocator<FileSet<CopyEntity>> allocator =
+ (PriorityIterableBasedRequestAllocator<FileSet<CopyEntity>>) m.invoke(source, copyConfiguration, maxThreads);
+ Iterator<FileSet<CopyEntity>> prioritizedFileSets =
+ allocator.allocateRequests(requestorIterator, copyConfiguration.getMaxToCopy());
+ List<FileSet<CopyEntity>> fileSetList = allocator.getRequestsExceedingAvailableResourcePool();
+ Assert.assertEquals(fileSetList.size(), 2);
+
+ FileSet<CopyEntity> fileSet = fileSetList.get(0);
+ Assert.assertEquals(fileSet.getDataset().getUrn(), "/test");
+ Assert.assertEquals(fileSet.getTotalEntities(), 5);
+ Assert.assertEquals(fileSet.getTotalSizeInBytes(), 50);
+
+ fileSet = fileSetList.get(1);
+ Assert.assertEquals(fileSet.getDataset().getUrn(), "/test");
+ Assert.assertEquals(fileSet.getTotalEntities(), 5);
+ Assert.assertEquals(fileSet.getTotalSizeInBytes(), 50);
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/457ede26/gobblin-utility/src/main/java/org/apache/gobblin/util/request_allocation/ConcurrentBoundedPriorityIterable.java
----------------------------------------------------------------------
diff --git a/gobblin-utility/src/main/java/org/apache/gobblin/util/request_allocation/ConcurrentBoundedPriorityIterable.java b/gobblin-utility/src/main/java/org/apache/gobblin/util/request_allocation/ConcurrentBoundedPriorityIterable.java
index c1a4505..bcfcf29 100644
--- a/gobblin-utility/src/main/java/org/apache/gobblin/util/request_allocation/ConcurrentBoundedPriorityIterable.java
+++ b/gobblin-utility/src/main/java/org/apache/gobblin/util/request_allocation/ConcurrentBoundedPriorityIterable.java
@@ -22,15 +22,15 @@ import java.util.Iterator;
import java.util.List;
import java.util.TreeSet;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+
import org.slf4j.Logger;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
import com.google.common.collect.Lists;
-import lombok.Getter;
-import lombok.extern.slf4j.Slf4j;
-
/**
* A concurrent bounded priority {@link Iterable}. Given a {@link ResourcePool}, a {@link ResourceEstimator}, and a
@@ -65,12 +65,24 @@ public class ConcurrentBoundedPriorityIterable<T> implements Iterable<AllocatedR
private int requestsRefused = 0;
private int requestsEvicted = 0;
+ //These are for submitting alertable events
+ private String storeRejectedRequestsSetting;
+ @Getter
+ private List<T> requestsExceedingAvailableResourcePool = Lists.newArrayList();
+ @Getter
+ private List<T> requestsRejectedWithLowPriority = Lists.newArrayList();
+ @Getter
+ private List<T> requestsRejectedDueToInsufficientEviction = Lists.newArrayList();
+ @Getter
+ private List<T> requestsDropped = Lists.newArrayList();
+
// These are ResourceRequirements for temporary use to avoid instantiation costs
private final ResourceRequirement candidateRequirement;
private final ResourceRequirement tmpRequirement;
private final ResourceRequirement reuse;
- public ConcurrentBoundedPriorityIterable(final Comparator<? super T> prioritizer, ResourceEstimator<T> resourceEstimator, ResourcePool pool) {
+ public ConcurrentBoundedPriorityIterable(final Comparator<? super T> prioritizer,
+ ResourceEstimator<T> resourceEstimator, String storeRejectedRequestsSetting, ResourcePool pool) {
this.estimator = resourceEstimator;
this.resourcePool = pool;
@@ -79,6 +91,8 @@ public class ConcurrentBoundedPriorityIterable<T> implements Iterable<AllocatedR
this.allDifferentComparator = new AllDifferentComparator();
this.elements = new TreeSet<>(this.allDifferentComparator);
+ this.storeRejectedRequestsSetting = storeRejectedRequestsSetting;
+
this.currentRequirement = this.resourcePool.getResourceRequirementBuilder().zero().build();
this.maxResourceRequirement = new ResourceRequirement(this.currentRequirement);
@@ -94,7 +108,8 @@ public class ConcurrentBoundedPriorityIterable<T> implements Iterable<AllocatedR
*/
private class AllDifferentComparator implements Comparator<AllocatedRequestsIteratorBase.RequestWithResourceRequirement<T>> {
@Override
- public int compare(AllocatedRequestsIteratorBase.RequestWithResourceRequirement<T> t1, AllocatedRequestsIteratorBase.RequestWithResourceRequirement<T> t2) {
+ public int compare(AllocatedRequestsIteratorBase.RequestWithResourceRequirement<T> t1,
+ AllocatedRequestsIteratorBase.RequestWithResourceRequirement<T> t2) {
int providedComparison = ConcurrentBoundedPriorityIterable.this.comparator.compare(t1.getT(), t2.getT());
if (providedComparison != 0) {
return providedComparison;
@@ -111,12 +126,13 @@ public class ConcurrentBoundedPriorityIterable<T> implements Iterable<AllocatedR
*/
public boolean add(T t) {
if (this.closed) {
- throw new RuntimeException(ConcurrentBoundedPriorityIterable.class.getSimpleName() + " is no longer accepting requests!");
+ throw new RuntimeException(
+ ConcurrentBoundedPriorityIterable.class.getSimpleName() + " is no longer accepting requests!");
}
- AllocatedRequestsIteratorBase.RequestWithResourceRequirement<T>
- newElement = new AllocatedRequestsIteratorBase.RequestWithResourceRequirement<>(t,
- this.estimator.estimateRequirement(t, this.resourcePool));
+ AllocatedRequestsIteratorBase.RequestWithResourceRequirement<T> newElement =
+ new AllocatedRequestsIteratorBase.RequestWithResourceRequirement<>(t,
+ this.estimator.estimateRequirement(t, this.resourcePool));
boolean addedWorkunits = addImpl(newElement);
if (!addedWorkunits) {
this.rejectedElement = true;
@@ -132,21 +148,30 @@ public class ConcurrentBoundedPriorityIterable<T> implements Iterable<AllocatedR
if (this.resourcePool.exceedsHardBound(newElement.getResourceRequirement(), false)) {
// item does not fit even in empty pool
log.warn(String.format("Request %s is larger than the available resource pool. If the pool is not expanded, "
- + "it will never be selected. Request: %s.", newElement.getT(),
+ + "it will never be selected. Request: %s.", newElement.getT(),
this.resourcePool.stringifyRequirement(newElement.getResourceRequirement())));
+ if (!this.storeRejectedRequestsSetting
+ .equalsIgnoreCase(RequestAllocatorConfig.StoreRejectedRequestsConfig.NONE.name())) {
+ this.requestsExceedingAvailableResourcePool.add(newElement.getT());
+ }
this.requestsRefused++;
return false;
}
- ResourceRequirement candidateRequirement =
- ResourceRequirement.add(this.currentRequirement, newElement.getResourceRequirement(), this.candidateRequirement);
+ ResourceRequirement candidateRequirement = ResourceRequirement
+ .add(this.currentRequirement, newElement.getResourceRequirement(), this.candidateRequirement);
if (this.resourcePool.exceedsHardBound(candidateRequirement, false)) {
if (this.comparator.compare(this.elements.last().getT(), newElement.getT()) <= 0) {
- log.debug("Request {} does not fit in resource pool and is lower priority than current lowest priority request. "
- + "Rejecting", newElement.getT());
+ log.debug(
+ "Request {} does not fit in resource pool and is lower priority than current lowest priority request. "
+ + "Rejecting", newElement.getT());
this.requestsRefused++;
+ if (this.storeRejectedRequestsSetting
+ .equalsIgnoreCase(RequestAllocatorConfig.StoreRejectedRequestsConfig.ALL.name())) {
+ this.requestsRejectedWithLowPriority.add(newElement.getT());
+ }
return false;
}
@@ -154,11 +179,15 @@ public class ConcurrentBoundedPriorityIterable<T> implements Iterable<AllocatedR
this.currentRequirement.copyInto(this.tmpRequirement);
- for (AllocatedRequestsIteratorBase.RequestWithResourceRequirement<T> dropCandidate : this.elements.descendingSet()) {
+ for (AllocatedRequestsIteratorBase.RequestWithResourceRequirement<T> dropCandidate : this.elements
+ .descendingSet()) {
if (this.comparator.compare(dropCandidate.getT(), newElement.getT()) <= 0) {
- log.debug("Cannot evict enough requests to fit request {}. "
- + "Rejecting", newElement.getT());
+ log.debug("Cannot evict enough requests to fit request {}. " + "Rejecting", newElement.getT());
this.requestsRefused++;
+ if (this.storeRejectedRequestsSetting
+ .equalsIgnoreCase(RequestAllocatorConfig.StoreRejectedRequestsConfig.ALL.name())) {
+ this.requestsRejectedDueToInsufficientEviction.add(newElement.getT());
+ }
return false;
}
this.tmpRequirement.subtract(dropCandidate.getResourceRequirement());
@@ -172,6 +201,10 @@ public class ConcurrentBoundedPriorityIterable<T> implements Iterable<AllocatedR
for (AllocatedRequestsIteratorBase.RequestWithResourceRequirement<T> drop : toDrop) {
log.debug("Evicting request {}.", drop.getT());
this.requestsEvicted++;
+ if (this.storeRejectedRequestsSetting
+ .equalsIgnoreCase(RequestAllocatorConfig.StoreRejectedRequestsConfig.ALL.name())) {
+ this.requestsDropped.add(drop.getT());
+ }
this.elements.remove(drop);
this.currentRequirement.subtract(drop.getResourceRequirement());
}
@@ -205,13 +238,13 @@ public class ConcurrentBoundedPriorityIterable<T> implements Iterable<AllocatedR
StringBuilder messageBuilder = new StringBuilder("Statistics for ").
append(ConcurrentBoundedPriorityIterable.class.getSimpleName()).append(": {");
messageBuilder.append(this.resourcePool).append(", ");
- messageBuilder.append("totalResourcesUsed: ").append(this.resourcePool.stringifyRequirement(this.currentRequirement))
- .append(", ");
- messageBuilder.append("maxRequirementPerDimension: ").append(this.resourcePool.stringifyRequirement(this.maxResourceRequirement))
- .append(", ");
+ messageBuilder.append("totalResourcesUsed: ")
+ .append(this.resourcePool.stringifyRequirement(this.currentRequirement)).append(", ");
+ messageBuilder.append("maxRequirementPerDimension: ")
+ .append(this.resourcePool.stringifyRequirement(this.maxResourceRequirement)).append(", ");
messageBuilder.append("requestsOffered: ").append(this.requestsOffered).append(", ");
- messageBuilder.append("requestsAccepted: ").append(this.requestsOffered - this.requestsEvicted - this.requestsRefused)
- .append(", ");
+ messageBuilder.append("requestsAccepted: ")
+ .append(this.requestsOffered - this.requestsEvicted - this.requestsRefused).append(", ");
messageBuilder.append("requestsRefused: ").append(this.requestsRefused).append(", ");
messageBuilder.append("requestsEvicted: ").append(this.requestsEvicted);
messageBuilder.append("}");
@@ -228,5 +261,4 @@ public class ConcurrentBoundedPriorityIterable<T> implements Iterable<AllocatedR
this.closed = true;
return this.elements.iterator();
}
-
}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/457ede26/gobblin-utility/src/main/java/org/apache/gobblin/util/request_allocation/PriorityIterableBasedRequestAllocator.java
----------------------------------------------------------------------
diff --git a/gobblin-utility/src/main/java/org/apache/gobblin/util/request_allocation/PriorityIterableBasedRequestAllocator.java b/gobblin-utility/src/main/java/org/apache/gobblin/util/request_allocation/PriorityIterableBasedRequestAllocator.java
index 876a4eb..f5be73e 100644
--- a/gobblin-utility/src/main/java/org/apache/gobblin/util/request_allocation/PriorityIterableBasedRequestAllocator.java
+++ b/gobblin-utility/src/main/java/org/apache/gobblin/util/request_allocation/PriorityIterableBasedRequestAllocator.java
@@ -22,6 +22,9 @@ import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
+import lombok.AccessLevel;
+import lombok.Getter;
+
import org.slf4j.Logger;
import com.google.common.base.Function;
@@ -32,22 +35,35 @@ import org.apache.gobblin.util.Either;
import org.apache.gobblin.util.ExecutorsUtils;
import org.apache.gobblin.util.executors.IteratorExecutor;
-import lombok.AccessLevel;
-import lombok.AllArgsConstructor;
-import lombok.Getter;
-
-@AllArgsConstructor
public abstract class PriorityIterableBasedRequestAllocator<T extends Request<T>> implements RequestAllocator<T> {
private final Logger log;
@Getter(value = AccessLevel.PROTECTED)
private final RequestAllocatorConfig<T> configuration;
+ //These are for submitting alertable events
+ @Getter
+ private List<T> requestsExceedingAvailableResourcePool;
+ @Getter
+ private List<T> requestsRejectedWithLowPriority;
+ @Getter
+ private List<T> requestsRejectedDueToInsufficientEviction;
+ @Getter
+ private List<T> requestsDropped;
+
+ public PriorityIterableBasedRequestAllocator(Logger log, RequestAllocatorConfig<T> configuration) {
+ this.log = log;
+ this.configuration = configuration;
+ }
+
@Override
- public AllocatedRequestsIterator<T> allocateRequests(Iterator<? extends Requestor<T>> requestors, ResourcePool resourcePool) {
+ public AllocatedRequestsIterator<T> allocateRequests(Iterator<? extends Requestor<T>> requestors,
+ ResourcePool resourcePool) {
final ConcurrentBoundedPriorityIterable<T> iterable =
- new ConcurrentBoundedPriorityIterable<>(this.configuration.getPrioritizer(), this.configuration.getResourceEstimator(), resourcePool);
+ new ConcurrentBoundedPriorityIterable<>(this.configuration.getPrioritizer(),
+ this.configuration.getResourceEstimator(), this.configuration.getStoreRejectedRequestsSetting(),
+ resourcePool);
final Iterator<T> joinIterator = getJoinIterator(requestors, iterable);
@@ -57,36 +73,47 @@ public abstract class PriorityIterableBasedRequestAllocator<T extends Request<T>
}
} else {
- IteratorExecutor<Void> executor = new IteratorExecutor<>(Iterators.transform(joinIterator, new Function<T, Callable<Void>>() {
- @Override
- public Callable<Void> apply(final T input) {
- return new Callable<Void>() {
+ IteratorExecutor<Void> executor =
+ new IteratorExecutor<>(Iterators.transform(joinIterator, new Function<T, Callable<Void>>() {
@Override
- public Void call()
- throws Exception {
- iterable.add(input);
- return null;
+ public Callable<Void> apply(final T input) {
+ return new Callable<Void>() {
+ @Override
+ public Void call()
+ throws Exception {
+ iterable.add(input);
+ return null;
+ }
+ };
}
- };
- }
- }), this.configuration.getAllowedThreads(),
- ExecutorsUtils.newThreadFactory(Optional.of(log), Optional.of("request-allocator-%d")));
+ }), this.configuration.getAllowedThreads(),
+ ExecutorsUtils.newThreadFactory(Optional.of(log), Optional.of("request-allocator-%d")));
try {
List<Either<Void, ExecutionException>> results = executor.executeAndGetResults();
IteratorExecutor.logFailures(results, log, 10);
} catch (InterruptedException ie) {
log.error("Request allocation was interrupted.");
- return new AllocatedRequestsIteratorBase<>(Iterators.<AllocatedRequestsIteratorBase.RequestWithResourceRequirement<T>>emptyIterator(),
- resourcePool);
+ return new AllocatedRequestsIteratorBase<>(
+ Iterators.<AllocatedRequestsIteratorBase.RequestWithResourceRequirement<T>>emptyIterator(), resourcePool);
}
}
iterable.logStatistics(Optional.of(this.log));
+
+ //Get all requests rejected/dropped
+ getRejectedAndDroppedRequests(iterable);
+
return new AllocatedRequestsIteratorBase<>(iterable.iterator(), resourcePool);
}
+ public void getRejectedAndDroppedRequests(ConcurrentBoundedPriorityIterable<T> iterable) {
+ requestsExceedingAvailableResourcePool = iterable.getRequestsExceedingAvailableResourcePool();
+ requestsRejectedWithLowPriority = iterable.getRequestsRejectedWithLowPriority();
+ requestsRejectedDueToInsufficientEviction = iterable.getRequestsRejectedDueToInsufficientEviction();
+ requestsDropped = iterable.getRequestsDropped();
+ }
+
protected abstract Iterator<T> getJoinIterator(Iterator<? extends Requestor<T>> requestors,
ConcurrentBoundedPriorityIterable<T> requestIterable);
-
}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/457ede26/gobblin-utility/src/main/java/org/apache/gobblin/util/request_allocation/RequestAllocatorConfig.java
----------------------------------------------------------------------
diff --git a/gobblin-utility/src/main/java/org/apache/gobblin/util/request_allocation/RequestAllocatorConfig.java b/gobblin-utility/src/main/java/org/apache/gobblin/util/request_allocation/RequestAllocatorConfig.java
index b33070b..5a3051a 100644
--- a/gobblin-utility/src/main/java/org/apache/gobblin/util/request_allocation/RequestAllocatorConfig.java
+++ b/gobblin-utility/src/main/java/org/apache/gobblin/util/request_allocation/RequestAllocatorConfig.java
@@ -20,12 +20,12 @@ package org.apache.gobblin.util.request_allocation;
import java.io.Serializable;
import java.util.Comparator;
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
-
import lombok.AllArgsConstructor;
import lombok.Getter;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+
@AllArgsConstructor
@Getter
@@ -34,6 +34,11 @@ public class RequestAllocatorConfig<T extends Request<T>> {
private final ResourceEstimator<T> resourceEstimator;
private final int allowedThreads;
private Config limitedScopeConfig;
+ private String storeRejectedRequestsSetting;
+
+ public enum StoreRejectedRequestsConfig {
+ ALL, MIN, NONE
+ }
public static <T extends Request<T>> Builder<T> builder(ResourceEstimator<T> resourceEstimator) {
return new Builder<>(resourceEstimator);
@@ -44,6 +49,7 @@ public class RequestAllocatorConfig<T extends Request<T>> {
private final ResourceEstimator<T> resourceEstimator;
private int allowedThreads = 1;
private Config limitedScopeConfig;
+ private String storeRejectedRequestsSetting = StoreRejectedRequestsConfig.MIN.name();
public Builder(ResourceEstimator<T> resourceEstimator) {
this.resourceEstimator = resourceEstimator;
@@ -68,11 +74,17 @@ public class RequestAllocatorConfig<T extends Request<T>> {
return this;
}
+ public Builder<T> storeRejectedRequests(String storeRejectedRequestsSetting) {
+ this.storeRejectedRequestsSetting = storeRejectedRequestsSetting;
+ return this;
+ }
+
public RequestAllocatorConfig<T> build() {
if (this.limitedScopeConfig == null) {
this.limitedScopeConfig = ConfigFactory.empty();
}
- return new RequestAllocatorConfig<>(this.prioritizer, this.resourceEstimator, this.allowedThreads, this.limitedScopeConfig);
+ return new RequestAllocatorConfig<>(this.prioritizer, this.resourceEstimator, this.allowedThreads,
+ this.limitedScopeConfig, this.storeRejectedRequestsSetting);
}
}
@@ -82,5 +94,4 @@ public class RequestAllocatorConfig<T extends Request<T>> {
return 0;
}
}
-
}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/457ede26/gobblin-utility/src/test/java/org/apache/gobblin/util/request_allocation/ConcurrentBoundedPriorityIterableTest.java
----------------------------------------------------------------------
diff --git a/gobblin-utility/src/test/java/org/apache/gobblin/util/request_allocation/ConcurrentBoundedPriorityIterableTest.java b/gobblin-utility/src/test/java/org/apache/gobblin/util/request_allocation/ConcurrentBoundedPriorityIterableTest.java
index 58a56ea..2145bc0 100644
--- a/gobblin-utility/src/test/java/org/apache/gobblin/util/request_allocation/ConcurrentBoundedPriorityIterableTest.java
+++ b/gobblin-utility/src/test/java/org/apache/gobblin/util/request_allocation/ConcurrentBoundedPriorityIterableTest.java
@@ -32,10 +32,12 @@ public class ConcurrentBoundedPriorityIterableTest {
public static final String MEMORY = "memory";
@Test
- public void test() throws Exception {
+ public void test()
+ throws Exception {
- ConcurrentBoundedPriorityIterable<String> iterable = new ConcurrentBoundedPriorityIterable<>(new MyComparator(),
- new MyEstimator(), ResourcePool.builder().maxResource(MEMORY, 100.).build());
+ ConcurrentBoundedPriorityIterable<String> iterable =
+ new ConcurrentBoundedPriorityIterable<>(new MyComparator(), new MyEstimator(), "min",
+ ResourcePool.builder().maxResource(MEMORY, 100.).build());
// doesn't fit
Assert.assertFalse(iterable.add("a-500"));
@@ -57,8 +59,8 @@ public class ConcurrentBoundedPriorityIterableTest {
Assert.assertTrue(iterable.add("b-50"));
// Check items
- List<String> items = Lists.newArrayList(Iterators.transform(iterable.iterator(),
- new AllocatedRequestsIteratorBase.TExtractor<String>()));
+ List<String> items = Lists
+ .newArrayList(Iterators.transform(iterable.iterator(), new AllocatedRequestsIteratorBase.TExtractor<String>()));
Assert.assertEquals(items.size(), 2);
Assert.assertEquals(items.get(0), "b-50");
Assert.assertEquals(items.get(1), "d-50");
@@ -66,15 +68,15 @@ public class ConcurrentBoundedPriorityIterableTest {
iterable.reopen();
// a high priority that won't fit even with evictions should not evict anything
Assert.assertFalse(iterable.add("c-500"));
- items = Lists.newArrayList(Iterators.transform(iterable.iterator(),
- new AllocatedRequestsIteratorBase.TExtractor<String>()));
+ items = Lists
+ .newArrayList(Iterators.transform(iterable.iterator(), new AllocatedRequestsIteratorBase.TExtractor<String>()));
Assert.assertEquals(items.size(), 2);
iterable.reopen();
// even if it is higher priority than everything else
Assert.assertFalse(iterable.add("a-500"));
- items = Lists.newArrayList(Iterators.transform(iterable.iterator(),
- new AllocatedRequestsIteratorBase.TExtractor<String>()));
+ items = Lists
+ .newArrayList(Iterators.transform(iterable.iterator(), new AllocatedRequestsIteratorBase.TExtractor<String>()));
Assert.assertEquals(items.size(), 2);
}
@@ -94,5 +96,4 @@ public class ConcurrentBoundedPriorityIterableTest {
return resourcePool.getResourceRequirementBuilder().setRequirement(MEMORY, memory).build();
}
}
-
}
\ No newline at end of file