You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by ab...@apache.org on 2018/03/21 08:30:44 UTC

[26/50] incubator-gobblin git commit: [GOBBLIN-379] Submit an event when DistCp job resource requirements exceed a hard bound

[GOBBLIN-379] Submit an event when DistCp job resource requirements exceed a hard bound

Closes #2257 from sv2000/gobblin-379


Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/457ede26
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/457ede26
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/457ede26

Branch: refs/heads/0.12.0
Commit: 457ede26da2c7f693d4fe321d4c50b0e25e0d22d
Parents: bde5bb1
Author: suvasude <su...@linkedin.biz>
Authored: Thu Feb 8 11:37:21 2018 -0800
Committer: Hung Tran <hu...@linkedin.com>
Committed: Thu Feb 8 11:37:21 2018 -0800

----------------------------------------------------------------------
 .../data/management/copy/CopyConfiguration.java |  17 ++-
 .../data/management/copy/CopySource.java        | 105 ++++++++++++++-----
 .../data/management/copy/CopySourceTest.java    |  84 +++++++++++++++
 .../ConcurrentBoundedPriorityIterable.java      |  80 +++++++++-----
 .../PriorityIterableBasedRequestAllocator.java  |  71 +++++++++----
 .../RequestAllocatorConfig.java                 |  21 +++-
 .../ConcurrentBoundedPriorityIterableTest.java  |  21 ++--
 7 files changed, 309 insertions(+), 90 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/457ede26/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopyConfiguration.java
----------------------------------------------------------------------
diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopyConfiguration.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopyConfiguration.java
index 211ad13..c4d07e2 100644
--- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopyConfiguration.java
+++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopyConfiguration.java
@@ -23,6 +23,8 @@ import lombok.AllArgsConstructor;
 import lombok.Builder;
 import lombok.Data;
 
+import org.apache.gobblin.util.request_allocation.ConcurrentBoundedPriorityIterable;
+import org.apache.gobblin.util.request_allocation.RequestAllocatorConfig;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 
@@ -65,6 +67,13 @@ public class CopyConfiguration {
 
   public static final String ABORT_ON_SINGLE_DATASET_FAILURE = COPY_PREFIX + ".abortOnSingleDatasetFailure";
 
+  /*
+   * Config to store different classes of rejected requests. Possible values are "all","none", or "min" (default).
+   */
+  public static final String STORE_REJECTED_REQUESTS_KEY = COPY_PREFIX + ".store.rejected.requests";
+  public static final String DEFAULT_STORE_REJECTED_REQUESTS =
+      RequestAllocatorConfig.StoreRejectedRequestsConfig.MIN.name();
+
   /**
    * User supplied directory where files should be published. This value is identical for all datasets in the distcp job.
    */
@@ -81,6 +90,7 @@ public class CopyConfiguration {
   private final FileSystem targetFs;
   private final Optional<FileSetComparator> prioritizer;
   private final ResourcePool maxToCopy;
+  private final String storeRejectedRequestsSetting;
 
   private final Config config;
 
@@ -114,8 +124,8 @@ public class CopyConfiguration {
       if (properties.containsKey(PRIORITIZER_ALIAS_KEY)) {
         try {
           this.prioritizer = Optional.of(GobblinConstructorUtils.<FileSetComparator>invokeLongestConstructor(
-              new ClassAliasResolver(FileSetComparator.class).resolveClass(
-                  properties.getProperty(PRIORITIZER_ALIAS_KEY)), properties));
+              new ClassAliasResolver(FileSetComparator.class)
+                  .resolveClass(properties.getProperty(PRIORITIZER_ALIAS_KEY)), properties));
         } catch (ReflectiveOperationException roe) {
           throw new RuntimeException("Could not build prioritizer.", roe);
         }
@@ -124,6 +134,9 @@ public class CopyConfiguration {
       }
       this.maxToCopy = CopyResourcePool.fromConfig(ConfigUtils.getConfigOrEmpty(this.config, MAX_COPY_PREFIX));
 
+      this.storeRejectedRequestsSetting =
+          properties.getProperty(CopyConfiguration.STORE_REJECTED_REQUESTS_KEY, DEFAULT_STORE_REJECTED_REQUESTS);
+
       this.abortOnSingleDatasetFailure = false;
       if (this.config.hasPath(ABORT_ON_SINGLE_DATASET_FAILURE)) {
         this.abortOnSingleDatasetFailure = this.config.getBoolean(ABORT_ON_SINGLE_DATASET_FAILURE);

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/457ede26/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopySource.java
----------------------------------------------------------------------
diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopySource.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopySource.java
index 615d6ad..3355f3d 100644
--- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopySource.java
+++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopySource.java
@@ -29,6 +29,9 @@ import java.util.concurrent.Future;
 
 import javax.annotation.Nullable;
 
+import lombok.AllArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 
@@ -37,6 +40,7 @@ import com.google.common.base.Optional;
 import com.google.common.base.Predicates;
 import com.google.common.collect.HashMultimap;
 import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Iterators;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Multimaps;
@@ -62,6 +66,7 @@ import org.apache.gobblin.dataset.IterableDatasetFinder;
 import org.apache.gobblin.dataset.IterableDatasetFinderImpl;
 import org.apache.gobblin.instrumented.Instrumented;
 import org.apache.gobblin.metrics.GobblinMetrics;
+import org.apache.gobblin.metrics.GobblinTrackingEvent;
 import org.apache.gobblin.metrics.MetricContext;
 import org.apache.gobblin.metrics.Tag;
 import org.apache.gobblin.metrics.event.EventSubmitter;
@@ -84,13 +89,11 @@ import org.apache.gobblin.util.guid.Guid;
 import org.apache.gobblin.util.request_allocation.GreedyAllocator;
 import org.apache.gobblin.util.request_allocation.HierarchicalAllocator;
 import org.apache.gobblin.util.request_allocation.HierarchicalPrioritizer;
+import org.apache.gobblin.util.request_allocation.PriorityIterableBasedRequestAllocator;
 import org.apache.gobblin.util.request_allocation.RequestAllocator;
 import org.apache.gobblin.util.request_allocation.RequestAllocatorConfig;
 import org.apache.gobblin.util.request_allocation.RequestAllocatorUtils;
 
-import lombok.AllArgsConstructor;
-import lombok.extern.slf4j.Slf4j;
-
 
 /**
  * {@link org.apache.gobblin.source.Source} that generates work units from {@link org.apache.gobblin.data.management.copy.CopyableDataset}s.
@@ -112,11 +115,21 @@ public class CopySource extends AbstractSource<String, FileAwareInputStream> {
   public static final String SIMULATE = CopyConfiguration.COPY_PREFIX + ".simulate";
   public static final String MAX_SIZE_MULTI_WORKUNITS = CopyConfiguration.COPY_PREFIX + ".binPacking.maxSizePerBin";
   public static final String MAX_WORK_UNITS_PER_BIN = CopyConfiguration.COPY_PREFIX + ".binPacking.maxWorkUnitsPerBin";
+  public static final String REQUESTS_EXCEEDING_AVAILABLE_RESOURCE_POOL_EVENT_NAME =
+      "RequestsExceedingAvailableResourcePoolEvent";
+  public static final String REQUESTS_DROPPED_EVENT_NAME = "RequestsDroppedEvent";
+  public static final String REQUESTS_REJECTED_DUE_TO_INSUFFICIENT_EVICTION_EVENT_NAME =
+      "RequestsRejectedDueToInsufficientEvictionEvent";
+  public static final String REQUESTS_REJECTED_WITH_LOW_PRIORITY_EVENT_NAME = "RequestsRejectedWithLowPriorityEvent";
+  public static final String FILESET_NAME = "fileset.name";
+  public static final String FILESET_TOTAL_ENTITIES = "fileset.total.entities";
+  public static final String FILESET_TOTAL_SIZE_IN_BYTES = "fileset.total.size";
 
   private static final String WORK_UNIT_WEIGHT = CopyConfiguration.COPY_PREFIX + ".workUnitWeight";
   private final WorkUnitWeighter weighter = new FieldWeighter(WORK_UNIT_WEIGHT);
 
   public MetricContext metricContext;
+  public EventSubmitter eventSubmitter;
 
   protected Optional<LineageInfo> lineageInfo;
 
@@ -145,16 +158,17 @@ public class CopySource extends AbstractSource<String, FileAwareInputStream> {
 
     try {
 
-      DeprecationUtils.renameDeprecatedKeys(state, CopyConfiguration.MAX_COPY_PREFIX + "." + CopyResourcePool.ENTITIES_KEY,
-          Lists.newArrayList(MAX_FILES_COPIED_KEY));
+      DeprecationUtils
+          .renameDeprecatedKeys(state, CopyConfiguration.MAX_COPY_PREFIX + "." + CopyResourcePool.ENTITIES_KEY,
+              Lists.newArrayList(MAX_FILES_COPIED_KEY));
 
       final FileSystem sourceFs = HadoopUtils.getSourceFileSystem(state);
       final FileSystem targetFs = HadoopUtils.getWriterFileSystem(state, 1, 0);
       state.setProp(SlaEventKeys.SOURCE_URI, sourceFs.getUri());
       state.setProp(SlaEventKeys.DESTINATION_URI, targetFs.getUri());
 
-      log.info("Identified source file system at {} and target file system at {}.",
-          sourceFs.getUri(), targetFs.getUri());
+      log.info("Identified source file system at {} and target file system at {}.", sourceFs.getUri(),
+          targetFs.getUri());
 
       long maxSizePerBin = state.getPropAsLong(MAX_SIZE_MULTI_WORKUNITS, 0);
       long maxWorkUnitsPerMultiWorkUnit = state.getPropAsLong(MAX_WORK_UNITS_PER_BIN, 50);
@@ -165,26 +179,31 @@ public class CopySource extends AbstractSource<String, FileAwareInputStream> {
 
       final CopyConfiguration copyConfiguration = CopyConfiguration.builder(targetFs, state.getProperties()).build();
 
+      this.eventSubmitter = new EventSubmitter.Builder(this.metricContext, CopyConfiguration.COPY_PREFIX).build();
       DatasetsFinder<CopyableDatasetBase> datasetFinder = DatasetUtils
           .instantiateDatasetFinder(state.getProperties(), sourceFs, DEFAULT_DATASET_PROFILE_CLASS_KEY,
-              new EventSubmitter.Builder(this.metricContext, CopyConfiguration.COPY_PREFIX).build(), state);
+              this.eventSubmitter, state);
 
       IterableDatasetFinder<CopyableDatasetBase> iterableDatasetFinder =
           datasetFinder instanceof IterableDatasetFinder ? (IterableDatasetFinder<CopyableDatasetBase>) datasetFinder
               : new IterableDatasetFinderImpl<>(datasetFinder);
 
-      Iterator<CopyableDatasetRequestor> requestorIteratorWithNulls =
-          Iterators.transform(iterableDatasetFinder.getDatasetsIterator(),
+      Iterator<CopyableDatasetRequestor> requestorIteratorWithNulls = Iterators
+          .transform(iterableDatasetFinder.getDatasetsIterator(),
               new CopyableDatasetRequestor.Factory(targetFs, copyConfiguration, log));
-      Iterator<CopyableDatasetRequestor> requestorIterator = Iterators.filter(requestorIteratorWithNulls,
-          Predicates.<CopyableDatasetRequestor>notNull());
+      Iterator<CopyableDatasetRequestor> requestorIterator =
+          Iterators.filter(requestorIteratorWithNulls, Predicates.<CopyableDatasetRequestor>notNull());
 
       final SetMultimap<FileSet<CopyEntity>, WorkUnit> workUnitsMap =
           Multimaps.<FileSet<CopyEntity>, WorkUnit>synchronizedSetMultimap(
               HashMultimap.<FileSet<CopyEntity>, WorkUnit>create());
 
       RequestAllocator<FileSet<CopyEntity>> allocator = createRequestAllocator(copyConfiguration, maxThreads);
-      Iterator<FileSet<CopyEntity>> prioritizedFileSets = allocator.allocateRequests(requestorIterator, copyConfiguration.getMaxToCopy());
+      Iterator<FileSet<CopyEntity>> prioritizedFileSets =
+          allocator.allocateRequests(requestorIterator, copyConfiguration.getMaxToCopy());
+
+      //Submit alertable events for unfulfilled requests
+      submitUnfulfilledRequestEvents(allocator);
 
       Iterator<Callable<Void>> callableIterator =
           Iterators.transform(prioritizedFileSets, new Function<FileSet<CopyEntity>, Callable<Void>>() {
@@ -197,8 +216,7 @@ public class CopySource extends AbstractSource<String, FileAwareInputStream> {
           });
 
       try {
-        List<Future<Void>> futures = new IteratorExecutor<>(callableIterator,
-            maxThreads,
+        List<Future<Void>> futures = new IteratorExecutor<>(callableIterator, maxThreads,
             ExecutorsUtils.newDaemonThreadFactory(Optional.of(log), Optional.of("Copy-file-listing-pool-%d")))
             .execute();
 
@@ -231,8 +249,8 @@ public class CopySource extends AbstractSource<String, FileAwareInputStream> {
         return Lists.newArrayList();
       }
 
-      List<? extends WorkUnit> workUnits =
-          new WorstFitDecreasingBinPacking(maxSizePerBin).pack(Lists.newArrayList(workUnitsMap.values()), this.weighter);
+      List<? extends WorkUnit> workUnits = new WorstFitDecreasingBinPacking(maxSizePerBin)
+          .pack(Lists.newArrayList(workUnitsMap.values()), this.weighter);
       log.info(String.format(
           "Bin packed work units. Initial work units: %d, packed work units: %d, max weight per bin: %d, "
               + "max work units per bin: %d.", workUnitsMap.size(), workUnits.size(), maxSizePerBin,
@@ -243,11 +261,42 @@ public class CopySource extends AbstractSource<String, FileAwareInputStream> {
     }
   }
 
-  private RequestAllocator<FileSet<CopyEntity>> createRequestAllocator(CopyConfiguration copyConfiguration, int maxThreads) {
-    Optional<FileSetComparator> prioritizer = copyConfiguration.getPrioritizer();
+  private void submitUnfulfilledRequestEventsHelper(List<FileSet<CopyEntity>> fileSetList, String eventName) {
+    for (FileSet<CopyEntity> fileSet : fileSetList) {
+      GobblinTrackingEvent event =
+          GobblinTrackingEvent.newBuilder().setName(eventName).setNamespace(CopySource.class.getName()).setMetadata(
+              ImmutableMap.<String, String>builder()
+                  .put(ConfigurationKeys.DATASET_URN_KEY, fileSet.getDataset().getUrn())
+                  .put(FILESET_TOTAL_ENTITIES, Integer.toString(fileSet.getTotalEntities()))
+                  .put(FILESET_TOTAL_SIZE_IN_BYTES, Long.toString(fileSet.getTotalSizeInBytes()))
+                  .put(FILESET_NAME, fileSet.getName()).build()).build();
+      this.metricContext.submitEvent(event);
+    }
+  }
 
+  private void submitUnfulfilledRequestEvents(RequestAllocator<FileSet<CopyEntity>> allocator) {
+    if (PriorityIterableBasedRequestAllocator.class.isAssignableFrom(allocator.getClass())) {
+      PriorityIterableBasedRequestAllocator<FileSet<CopyEntity>> priorityIterableBasedRequestAllocator =
+          (PriorityIterableBasedRequestAllocator<FileSet<CopyEntity>>) allocator;
+      submitUnfulfilledRequestEventsHelper(
+          priorityIterableBasedRequestAllocator.getRequestsExceedingAvailableResourcePool(),
+          REQUESTS_EXCEEDING_AVAILABLE_RESOURCE_POOL_EVENT_NAME);
+      submitUnfulfilledRequestEventsHelper(
+          priorityIterableBasedRequestAllocator.getRequestsRejectedDueToInsufficientEviction(),
+          REQUESTS_REJECTED_DUE_TO_INSUFFICIENT_EVICTION_EVENT_NAME);
+      submitUnfulfilledRequestEventsHelper(priorityIterableBasedRequestAllocator.getRequestsRejectedWithLowPriority(),
+          REQUESTS_REJECTED_WITH_LOW_PRIORITY_EVENT_NAME);
+      submitUnfulfilledRequestEventsHelper(priorityIterableBasedRequestAllocator.getRequestsDropped(),
+          REQUESTS_DROPPED_EVENT_NAME);
+    }
+  }
+
+  private RequestAllocator<FileSet<CopyEntity>> createRequestAllocator(CopyConfiguration copyConfiguration,
+      int maxThreads) {
+    Optional<FileSetComparator> prioritizer = copyConfiguration.getPrioritizer();
     RequestAllocatorConfig.Builder<FileSet<CopyEntity>> configBuilder =
         RequestAllocatorConfig.builder(new FileSetResourceEstimator()).allowParallelization(maxThreads)
+            .storeRejectedRequests(copyConfiguration.getStoreRejectedRequestsSetting())
             .withLimitedScopeConfig(copyConfiguration.getPrioritizationConfig());
 
     if (!prioritizer.isPresent()) {
@@ -323,9 +372,8 @@ public class CopySource extends AbstractSource<String, FileAwareInputStream> {
        * a DatasetFinder. Consequently, the source and destination dataset for the CopyableFile lineage are expected
        * to be set by the same logic
        */
-      if (lineageInfo.isPresent() &&
-          copyableFile.getSourceDataset() != null &&
-          copyableFile.getDestinationDataset() != null) {
+      if (lineageInfo.isPresent() && copyableFile.getSourceDataset() != null
+          && copyableFile.getDestinationDataset() != null) {
         lineageInfo.get().setSource(copyableFile.getSourceDataset(), workUnit);
       }
     }
@@ -350,7 +398,8 @@ public class CopySource extends AbstractSource<String, FileAwareInputStream> {
     return new EmptyExtractor<>("empty");
   }
 
-  protected Extractor<String, FileAwareInputStream> extractorForCopyableFile(FileSystem fs, CopyableFile cf, WorkUnitState state)
+  protected Extractor<String, FileAwareInputStream> extractorForCopyableFile(FileSystem fs, CopyableFile cf,
+      WorkUnitState state)
       throws IOException {
     return new FileAwareInputStreamExtractor(fs, cf, state);
   }
@@ -365,7 +414,8 @@ public class CopySource extends AbstractSource<String, FileAwareInputStream> {
   @Deprecated
   protected FileSystem getSourceFileSystem(State state)
       throws IOException {
-    Configuration conf = HadoopUtils.getConfFromState(state, Optional.of(ConfigurationKeys.SOURCE_FILEBASED_ENCRYPTED_CONFIG_PATH));
+    Configuration conf =
+        HadoopUtils.getConfFromState(state, Optional.of(ConfigurationKeys.SOURCE_FILEBASED_ENCRYPTED_CONFIG_PATH));
     String uri = state.getProp(ConfigurationKeys.SOURCE_FILEBASED_FS_URI, ConfigurationKeys.LOCAL_FS_URI);
     return HadoopUtils.getOptionallyThrottledFileSystem(FileSystem.get(URI.create(uri), conf), state);
   }
@@ -456,11 +506,12 @@ public class CopySource extends AbstractSource<String, FileAwareInputStream> {
     return CopyableDatasetMetadata.deserialize(state.getProp(SERIALIZED_COPYABLE_DATASET));
   }
 
-  private void setWorkUnitWatermark(WorkUnit workUnit, Optional<CopyableFileWatermarkGenerator> watermarkGenerator, CopyEntity copyEntity)
+  private void setWorkUnitWatermark(WorkUnit workUnit, Optional<CopyableFileWatermarkGenerator> watermarkGenerator,
+      CopyEntity copyEntity)
       throws IOException {
-    if (copyEntity instanceof  CopyableFile) {
+    if (copyEntity instanceof CopyableFile) {
       Optional<WatermarkInterval> watermarkIntervalOptional =
-          CopyableFileWatermarkHelper.getCopyableFileWatermark((CopyableFile)copyEntity, watermarkGenerator);
+          CopyableFileWatermarkHelper.getCopyableFileWatermark((CopyableFile) copyEntity, watermarkGenerator);
       if (watermarkIntervalOptional.isPresent()) {
         workUnit.setWatermarkInterval(watermarkIntervalOptional.get());
       }

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/457ede26/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/CopySourceTest.java
----------------------------------------------------------------------
diff --git a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/CopySourceTest.java b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/CopySourceTest.java
index d9e5368..377b2cf 100644
--- a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/CopySourceTest.java
+++ b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/CopySourceTest.java
@@ -17,19 +17,41 @@
 
 package org.apache.gobblin.data.management.copy;
 
+import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.util.Iterator;
 import java.util.List;
 
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.util.request_allocation.RequestAllocatorConfig;
+import org.apache.hadoop.fs.FileSystem;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
+import com.google.common.base.Predicates;
+import com.google.common.collect.Iterators;
+
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.configuration.SourceState;
 import org.apache.gobblin.data.management.dataset.DatasetUtils;
+import org.apache.gobblin.data.management.partition.CopyableDatasetRequestor;
+import org.apache.gobblin.data.management.partition.FileSet;
+import org.apache.gobblin.dataset.DatasetsFinder;
+import org.apache.gobblin.dataset.IterableDatasetFinder;
+import org.apache.gobblin.dataset.IterableDatasetFinderImpl;
+import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.metrics.event.EventSubmitter;
 import org.apache.gobblin.source.workunit.Extract;
 import org.apache.gobblin.source.workunit.WorkUnit;
+import org.apache.gobblin.util.HadoopUtils;
 import org.apache.gobblin.util.JobLauncherUtils;
+import org.apache.gobblin.util.request_allocation.PriorityIterableBasedRequestAllocator;
 
 
+@Slf4j
 public class CopySourceTest {
 
   @Test
@@ -106,4 +128,66 @@ public class CopySourceTest {
     Assert.assertNotNull(extractBelow);
   }
 
+  @Test
+  public void testSubmitUnfulfilledRequestEvents()
+      throws IOException, NoSuchMethodException, InvocationTargetException, IllegalAccessException {
+    SourceState state = new SourceState();
+
+    state.setProp(ConfigurationKeys.SOURCE_FILEBASED_FS_URI, "file:///");
+    state.setProp(ConfigurationKeys.WRITER_FILE_SYSTEM_URI, "file:///");
+    state.setProp(ConfigurationKeys.DATA_PUBLISHER_FINAL_DIR, "/target/dir");
+    state.setProp(DatasetUtils.DATASET_PROFILE_CLASS_KEY,
+        TestCopyablePartitionableDatasedFinder.class.getCanonicalName());
+    state.setProp(CopySource.MAX_CONCURRENT_LISTING_SERVICES, 2);
+    state.setProp(CopyConfiguration.MAX_COPY_PREFIX + ".size", "50");
+    state.setProp(CopyConfiguration.MAX_COPY_PREFIX + ".copyEntities", 2);
+    state.setProp(CopyConfiguration.STORE_REJECTED_REQUESTS_KEY,
+        RequestAllocatorConfig.StoreRejectedRequestsConfig.ALL.name().toLowerCase());
+    state.setProp(ConfigurationKeys.METRICS_CUSTOM_BUILDERS, "org.apache.gobblin.metrics.ConsoleEventReporterFactory");
+
+    CopySource source = new CopySource();
+
+    final FileSystem sourceFs = HadoopUtils.getSourceFileSystem(state);
+    final FileSystem targetFs = HadoopUtils.getWriterFileSystem(state, 1, 0);
+
+    int maxThreads = state
+        .getPropAsInt(CopySource.MAX_CONCURRENT_LISTING_SERVICES, CopySource.DEFAULT_MAX_CONCURRENT_LISTING_SERVICES);
+
+    final CopyConfiguration copyConfiguration = CopyConfiguration.builder(targetFs, state.getProperties()).build();
+
+    MetricContext metricContext = Instrumented.getMetricContext(state, CopySource.class);
+    EventSubmitter eventSubmitter = new EventSubmitter.Builder(metricContext, CopyConfiguration.COPY_PREFIX).build();
+    DatasetsFinder<CopyableDatasetBase> datasetFinder = DatasetUtils
+        .instantiateDatasetFinder(state.getProperties(), sourceFs, CopySource.DEFAULT_DATASET_PROFILE_CLASS_KEY,
+            eventSubmitter, state);
+
+    IterableDatasetFinder<CopyableDatasetBase> iterableDatasetFinder =
+        datasetFinder instanceof IterableDatasetFinder ? (IterableDatasetFinder<CopyableDatasetBase>) datasetFinder
+            : new IterableDatasetFinderImpl<>(datasetFinder);
+
+    Iterator<CopyableDatasetRequestor> requestorIteratorWithNulls = Iterators
+        .transform(iterableDatasetFinder.getDatasetsIterator(),
+            new CopyableDatasetRequestor.Factory(targetFs, copyConfiguration, log));
+    Iterator<CopyableDatasetRequestor> requestorIterator =
+        Iterators.filter(requestorIteratorWithNulls, Predicates.<CopyableDatasetRequestor>notNull());
+
+    Method m = CopySource.class.getDeclaredMethod("createRequestAllocator", CopyConfiguration.class, int.class);
+    m.setAccessible(true);
+    PriorityIterableBasedRequestAllocator<FileSet<CopyEntity>> allocator =
+        (PriorityIterableBasedRequestAllocator<FileSet<CopyEntity>>) m.invoke(source, copyConfiguration, maxThreads);
+    Iterator<FileSet<CopyEntity>> prioritizedFileSets =
+        allocator.allocateRequests(requestorIterator, copyConfiguration.getMaxToCopy());
+    List<FileSet<CopyEntity>> fileSetList = allocator.getRequestsExceedingAvailableResourcePool();
+    Assert.assertEquals(fileSetList.size(), 2);
+
+    FileSet<CopyEntity> fileSet = fileSetList.get(0);
+    Assert.assertEquals(fileSet.getDataset().getUrn(), "/test");
+    Assert.assertEquals(fileSet.getTotalEntities(), 5);
+    Assert.assertEquals(fileSet.getTotalSizeInBytes(), 50);
+
+    fileSet = fileSetList.get(1);
+    Assert.assertEquals(fileSet.getDataset().getUrn(), "/test");
+    Assert.assertEquals(fileSet.getTotalEntities(), 5);
+    Assert.assertEquals(fileSet.getTotalSizeInBytes(), 50);
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/457ede26/gobblin-utility/src/main/java/org/apache/gobblin/util/request_allocation/ConcurrentBoundedPriorityIterable.java
----------------------------------------------------------------------
diff --git a/gobblin-utility/src/main/java/org/apache/gobblin/util/request_allocation/ConcurrentBoundedPriorityIterable.java b/gobblin-utility/src/main/java/org/apache/gobblin/util/request_allocation/ConcurrentBoundedPriorityIterable.java
index c1a4505..bcfcf29 100644
--- a/gobblin-utility/src/main/java/org/apache/gobblin/util/request_allocation/ConcurrentBoundedPriorityIterable.java
+++ b/gobblin-utility/src/main/java/org/apache/gobblin/util/request_allocation/ConcurrentBoundedPriorityIterable.java
@@ -22,15 +22,15 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.TreeSet;
 
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+
 import org.slf4j.Logger;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Optional;
 import com.google.common.collect.Lists;
 
-import lombok.Getter;
-import lombok.extern.slf4j.Slf4j;
-
 
 /**
  * A concurrent bounded priority {@link Iterable}. Given a {@link ResourcePool}, a {@link ResourceEstimator}, and a
@@ -65,12 +65,24 @@ public class ConcurrentBoundedPriorityIterable<T> implements Iterable<AllocatedR
   private int requestsRefused = 0;
   private int requestsEvicted = 0;
 
+  //These are for submitting alertable events
+  private String storeRejectedRequestsSetting;
+  @Getter
+  private List<T> requestsExceedingAvailableResourcePool = Lists.newArrayList();
+  @Getter
+  private List<T> requestsRejectedWithLowPriority = Lists.newArrayList();
+  @Getter
+  private List<T> requestsRejectedDueToInsufficientEviction = Lists.newArrayList();
+  @Getter
+  private List<T> requestsDropped = Lists.newArrayList();
+
   // These are ResourceRequirements for temporary use to avoid instantiation costs
   private final ResourceRequirement candidateRequirement;
   private final ResourceRequirement tmpRequirement;
   private final ResourceRequirement reuse;
 
-  public ConcurrentBoundedPriorityIterable(final Comparator<? super T> prioritizer, ResourceEstimator<T> resourceEstimator, ResourcePool pool) {
+  public ConcurrentBoundedPriorityIterable(final Comparator<? super T> prioritizer,
+      ResourceEstimator<T> resourceEstimator, String storeRejectedRequestsSetting, ResourcePool pool) {
 
     this.estimator = resourceEstimator;
     this.resourcePool = pool;
@@ -79,6 +91,8 @@ public class ConcurrentBoundedPriorityIterable<T> implements Iterable<AllocatedR
     this.allDifferentComparator = new AllDifferentComparator();
     this.elements = new TreeSet<>(this.allDifferentComparator);
 
+    this.storeRejectedRequestsSetting = storeRejectedRequestsSetting;
+
     this.currentRequirement = this.resourcePool.getResourceRequirementBuilder().zero().build();
     this.maxResourceRequirement = new ResourceRequirement(this.currentRequirement);
 
@@ -94,7 +108,8 @@ public class ConcurrentBoundedPriorityIterable<T> implements Iterable<AllocatedR
    */
   private class AllDifferentComparator implements Comparator<AllocatedRequestsIteratorBase.RequestWithResourceRequirement<T>> {
     @Override
-    public int compare(AllocatedRequestsIteratorBase.RequestWithResourceRequirement<T> t1, AllocatedRequestsIteratorBase.RequestWithResourceRequirement<T> t2) {
+    public int compare(AllocatedRequestsIteratorBase.RequestWithResourceRequirement<T> t1,
+        AllocatedRequestsIteratorBase.RequestWithResourceRequirement<T> t2) {
       int providedComparison = ConcurrentBoundedPriorityIterable.this.comparator.compare(t1.getT(), t2.getT());
       if (providedComparison != 0) {
         return providedComparison;
@@ -111,12 +126,13 @@ public class ConcurrentBoundedPriorityIterable<T> implements Iterable<AllocatedR
    */
   public boolean add(T t) {
     if (this.closed) {
-      throw new RuntimeException(ConcurrentBoundedPriorityIterable.class.getSimpleName() + " is no longer accepting requests!");
+      throw new RuntimeException(
+          ConcurrentBoundedPriorityIterable.class.getSimpleName() + " is no longer accepting requests!");
     }
 
-    AllocatedRequestsIteratorBase.RequestWithResourceRequirement<T>
-        newElement = new AllocatedRequestsIteratorBase.RequestWithResourceRequirement<>(t,
-        this.estimator.estimateRequirement(t, this.resourcePool));
+    AllocatedRequestsIteratorBase.RequestWithResourceRequirement<T> newElement =
+        new AllocatedRequestsIteratorBase.RequestWithResourceRequirement<>(t,
+            this.estimator.estimateRequirement(t, this.resourcePool));
     boolean addedWorkunits = addImpl(newElement);
     if (!addedWorkunits) {
       this.rejectedElement = true;
@@ -132,21 +148,30 @@ public class ConcurrentBoundedPriorityIterable<T> implements Iterable<AllocatedR
     if (this.resourcePool.exceedsHardBound(newElement.getResourceRequirement(), false)) {
       // item does not fit even in empty pool
       log.warn(String.format("Request %s is larger than the available resource pool. If the pool is not expanded, "
-          + "it will never be selected. Request: %s.", newElement.getT(),
+              + "it will never be selected. Request: %s.", newElement.getT(),
           this.resourcePool.stringifyRequirement(newElement.getResourceRequirement())));
+      if (!this.storeRejectedRequestsSetting
+          .equalsIgnoreCase(RequestAllocatorConfig.StoreRejectedRequestsConfig.NONE.name())) {
+        this.requestsExceedingAvailableResourcePool.add(newElement.getT());
+      }
       this.requestsRefused++;
       return false;
     }
 
-    ResourceRequirement candidateRequirement =
-        ResourceRequirement.add(this.currentRequirement, newElement.getResourceRequirement(), this.candidateRequirement);
+    ResourceRequirement candidateRequirement = ResourceRequirement
+        .add(this.currentRequirement, newElement.getResourceRequirement(), this.candidateRequirement);
 
     if (this.resourcePool.exceedsHardBound(candidateRequirement, false)) {
 
       if (this.comparator.compare(this.elements.last().getT(), newElement.getT()) <= 0) {
-        log.debug("Request {} does not fit in resource pool and is lower priority than current lowest priority request. "
-            + "Rejecting", newElement.getT());
+        log.debug(
+            "Request {} does not fit in resource pool and is lower priority than current lowest priority request. "
+                + "Rejecting", newElement.getT());
         this.requestsRefused++;
+        if (this.storeRejectedRequestsSetting
+            .equalsIgnoreCase(RequestAllocatorConfig.StoreRejectedRequestsConfig.ALL.name())) {
+          this.requestsRejectedWithLowPriority.add(newElement.getT());
+        }
         return false;
       }
 
@@ -154,11 +179,15 @@ public class ConcurrentBoundedPriorityIterable<T> implements Iterable<AllocatedR
 
       this.currentRequirement.copyInto(this.tmpRequirement);
 
-      for (AllocatedRequestsIteratorBase.RequestWithResourceRequirement<T> dropCandidate : this.elements.descendingSet()) {
+      for (AllocatedRequestsIteratorBase.RequestWithResourceRequirement<T> dropCandidate : this.elements
+          .descendingSet()) {
         if (this.comparator.compare(dropCandidate.getT(), newElement.getT()) <= 0) {
-          log.debug("Cannot evict enough requests to fit request {}. "
-              + "Rejecting", newElement.getT());
+          log.debug("Cannot evict enough requests to fit request {}. " + "Rejecting", newElement.getT());
           this.requestsRefused++;
+          if (this.storeRejectedRequestsSetting
+              .equalsIgnoreCase(RequestAllocatorConfig.StoreRejectedRequestsConfig.ALL.name())) {
+            this.requestsRejectedDueToInsufficientEviction.add(newElement.getT());
+          }
           return false;
         }
         this.tmpRequirement.subtract(dropCandidate.getResourceRequirement());
@@ -172,6 +201,10 @@ public class ConcurrentBoundedPriorityIterable<T> implements Iterable<AllocatedR
       for (AllocatedRequestsIteratorBase.RequestWithResourceRequirement<T> drop : toDrop) {
         log.debug("Evicting request {}.", drop.getT());
         this.requestsEvicted++;
+        if (this.storeRejectedRequestsSetting
+            .equalsIgnoreCase(RequestAllocatorConfig.StoreRejectedRequestsConfig.ALL.name())) {
+          this.requestsDropped.add(drop.getT());
+        }
         this.elements.remove(drop);
         this.currentRequirement.subtract(drop.getResourceRequirement());
       }
@@ -205,13 +238,13 @@ public class ConcurrentBoundedPriorityIterable<T> implements Iterable<AllocatedR
     StringBuilder messageBuilder = new StringBuilder("Statistics for ").
         append(ConcurrentBoundedPriorityIterable.class.getSimpleName()).append(": {");
     messageBuilder.append(this.resourcePool).append(", ");
-    messageBuilder.append("totalResourcesUsed: ").append(this.resourcePool.stringifyRequirement(this.currentRequirement))
-        .append(", ");
-    messageBuilder.append("maxRequirementPerDimension: ").append(this.resourcePool.stringifyRequirement(this.maxResourceRequirement))
-        .append(", ");
+    messageBuilder.append("totalResourcesUsed: ")
+        .append(this.resourcePool.stringifyRequirement(this.currentRequirement)).append(", ");
+    messageBuilder.append("maxRequirementPerDimension: ")
+        .append(this.resourcePool.stringifyRequirement(this.maxResourceRequirement)).append(", ");
     messageBuilder.append("requestsOffered: ").append(this.requestsOffered).append(", ");
-    messageBuilder.append("requestsAccepted: ").append(this.requestsOffered - this.requestsEvicted - this.requestsRefused)
-        .append(", ");
+    messageBuilder.append("requestsAccepted: ")
+        .append(this.requestsOffered - this.requestsEvicted - this.requestsRefused).append(", ");
     messageBuilder.append("requestsRefused: ").append(this.requestsRefused).append(", ");
     messageBuilder.append("requestsEvicted: ").append(this.requestsEvicted);
     messageBuilder.append("}");
@@ -228,5 +261,4 @@ public class ConcurrentBoundedPriorityIterable<T> implements Iterable<AllocatedR
     this.closed = true;
     return this.elements.iterator();
   }
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/457ede26/gobblin-utility/src/main/java/org/apache/gobblin/util/request_allocation/PriorityIterableBasedRequestAllocator.java
----------------------------------------------------------------------
diff --git a/gobblin-utility/src/main/java/org/apache/gobblin/util/request_allocation/PriorityIterableBasedRequestAllocator.java b/gobblin-utility/src/main/java/org/apache/gobblin/util/request_allocation/PriorityIterableBasedRequestAllocator.java
index 876a4eb..f5be73e 100644
--- a/gobblin-utility/src/main/java/org/apache/gobblin/util/request_allocation/PriorityIterableBasedRequestAllocator.java
+++ b/gobblin-utility/src/main/java/org/apache/gobblin/util/request_allocation/PriorityIterableBasedRequestAllocator.java
@@ -22,6 +22,9 @@ import java.util.List;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
 
+import lombok.AccessLevel;
+import lombok.Getter;
+
 import org.slf4j.Logger;
 
 import com.google.common.base.Function;
@@ -32,22 +35,35 @@ import org.apache.gobblin.util.Either;
 import org.apache.gobblin.util.ExecutorsUtils;
 import org.apache.gobblin.util.executors.IteratorExecutor;
 
-import lombok.AccessLevel;
-import lombok.AllArgsConstructor;
-import lombok.Getter;
-
 
-@AllArgsConstructor
 public abstract class PriorityIterableBasedRequestAllocator<T extends Request<T>> implements RequestAllocator<T> {
 
   private final Logger log;
   @Getter(value = AccessLevel.PROTECTED)
   private final RequestAllocatorConfig<T> configuration;
 
+  //These are for submitting alertable events
+  @Getter
+  private List<T> requestsExceedingAvailableResourcePool;
+  @Getter
+  private List<T> requestsRejectedWithLowPriority;
+  @Getter
+  private List<T> requestsRejectedDueToInsufficientEviction;
+  @Getter
+  private List<T> requestsDropped;
+
+  public PriorityIterableBasedRequestAllocator(Logger log, RequestAllocatorConfig<T> configuration) {
+    this.log = log;
+    this.configuration = configuration;
+  }
+
   @Override
-  public AllocatedRequestsIterator<T> allocateRequests(Iterator<? extends Requestor<T>> requestors, ResourcePool resourcePool) {
+  public AllocatedRequestsIterator<T> allocateRequests(Iterator<? extends Requestor<T>> requestors,
+      ResourcePool resourcePool) {
     final ConcurrentBoundedPriorityIterable<T> iterable =
-        new ConcurrentBoundedPriorityIterable<>(this.configuration.getPrioritizer(), this.configuration.getResourceEstimator(), resourcePool);
+        new ConcurrentBoundedPriorityIterable<>(this.configuration.getPrioritizer(),
+            this.configuration.getResourceEstimator(), this.configuration.getStoreRejectedRequestsSetting(),
+            resourcePool);
 
     final Iterator<T> joinIterator = getJoinIterator(requestors, iterable);
 
@@ -57,36 +73,47 @@ public abstract class PriorityIterableBasedRequestAllocator<T extends Request<T>
       }
     } else {
 
-      IteratorExecutor<Void> executor = new IteratorExecutor<>(Iterators.transform(joinIterator, new Function<T, Callable<Void>>() {
-        @Override
-        public Callable<Void> apply(final T input) {
-          return new Callable<Void>() {
+      IteratorExecutor<Void> executor =
+          new IteratorExecutor<>(Iterators.transform(joinIterator, new Function<T, Callable<Void>>() {
             @Override
-            public Void call()
-                throws Exception {
-              iterable.add(input);
-              return null;
+            public Callable<Void> apply(final T input) {
+              return new Callable<Void>() {
+                @Override
+                public Void call()
+                    throws Exception {
+                  iterable.add(input);
+                  return null;
+                }
+              };
             }
-          };
-        }
-      }), this.configuration.getAllowedThreads(),
-          ExecutorsUtils.newThreadFactory(Optional.of(log), Optional.of("request-allocator-%d")));
+          }), this.configuration.getAllowedThreads(),
+              ExecutorsUtils.newThreadFactory(Optional.of(log), Optional.of("request-allocator-%d")));
 
       try {
         List<Either<Void, ExecutionException>> results = executor.executeAndGetResults();
         IteratorExecutor.logFailures(results, log, 10);
       } catch (InterruptedException ie) {
         log.error("Request allocation was interrupted.");
-        return new AllocatedRequestsIteratorBase<>(Iterators.<AllocatedRequestsIteratorBase.RequestWithResourceRequirement<T>>emptyIterator(),
-            resourcePool);
+        return new AllocatedRequestsIteratorBase<>(
+            Iterators.<AllocatedRequestsIteratorBase.RequestWithResourceRequirement<T>>emptyIterator(), resourcePool);
       }
     }
 
     iterable.logStatistics(Optional.of(this.log));
+
+    //Get all requests rejected/dropped
+    getRejectedAndDroppedRequests(iterable);
+
     return new AllocatedRequestsIteratorBase<>(iterable.iterator(), resourcePool);
   }
 
+  public void getRejectedAndDroppedRequests(ConcurrentBoundedPriorityIterable<T> iterable) {
+    requestsExceedingAvailableResourcePool = iterable.getRequestsExceedingAvailableResourcePool();
+    requestsRejectedWithLowPriority = iterable.getRequestsRejectedWithLowPriority();
+    requestsRejectedDueToInsufficientEviction = iterable.getRequestsRejectedDueToInsufficientEviction();
+    requestsDropped = iterable.getRequestsDropped();
+  }
+
   protected abstract Iterator<T> getJoinIterator(Iterator<? extends Requestor<T>> requestors,
       ConcurrentBoundedPriorityIterable<T> requestIterable);
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/457ede26/gobblin-utility/src/main/java/org/apache/gobblin/util/request_allocation/RequestAllocatorConfig.java
----------------------------------------------------------------------
diff --git a/gobblin-utility/src/main/java/org/apache/gobblin/util/request_allocation/RequestAllocatorConfig.java b/gobblin-utility/src/main/java/org/apache/gobblin/util/request_allocation/RequestAllocatorConfig.java
index b33070b..5a3051a 100644
--- a/gobblin-utility/src/main/java/org/apache/gobblin/util/request_allocation/RequestAllocatorConfig.java
+++ b/gobblin-utility/src/main/java/org/apache/gobblin/util/request_allocation/RequestAllocatorConfig.java
@@ -20,12 +20,12 @@ package org.apache.gobblin.util.request_allocation;
 import java.io.Serializable;
 import java.util.Comparator;
 
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
-
 import lombok.AllArgsConstructor;
 import lombok.Getter;
 
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+
 
 @AllArgsConstructor
 @Getter
@@ -34,6 +34,11 @@ public class RequestAllocatorConfig<T extends Request<T>> {
   private final ResourceEstimator<T> resourceEstimator;
   private final int allowedThreads;
   private Config limitedScopeConfig;
+  private String storeRejectedRequestsSetting;
+
+  public enum StoreRejectedRequestsConfig {
+    ALL, MIN, NONE
+  }
 
   public static <T extends Request<T>> Builder<T> builder(ResourceEstimator<T> resourceEstimator) {
     return new Builder<>(resourceEstimator);
@@ -44,6 +49,7 @@ public class RequestAllocatorConfig<T extends Request<T>> {
     private final ResourceEstimator<T> resourceEstimator;
     private int allowedThreads = 1;
     private Config limitedScopeConfig;
+    private String storeRejectedRequestsSetting = StoreRejectedRequestsConfig.MIN.name();
 
     public Builder(ResourceEstimator<T> resourceEstimator) {
       this.resourceEstimator = resourceEstimator;
@@ -68,11 +74,17 @@ public class RequestAllocatorConfig<T extends Request<T>> {
       return this;
     }
 
+    public Builder<T> storeRejectedRequests(String storeRejectedRequestsSetting) {
+      this.storeRejectedRequestsSetting = storeRejectedRequestsSetting;
+      return this;
+    }
+
     public RequestAllocatorConfig<T> build() {
       if (this.limitedScopeConfig == null) {
         this.limitedScopeConfig = ConfigFactory.empty();
       }
-      return new RequestAllocatorConfig<>(this.prioritizer, this.resourceEstimator, this.allowedThreads, this.limitedScopeConfig);
+      return new RequestAllocatorConfig<>(this.prioritizer, this.resourceEstimator, this.allowedThreads,
+          this.limitedScopeConfig, this.storeRejectedRequestsSetting);
     }
   }
 
@@ -82,5 +94,4 @@ public class RequestAllocatorConfig<T extends Request<T>> {
       return 0;
     }
   }
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/457ede26/gobblin-utility/src/test/java/org/apache/gobblin/util/request_allocation/ConcurrentBoundedPriorityIterableTest.java
----------------------------------------------------------------------
diff --git a/gobblin-utility/src/test/java/org/apache/gobblin/util/request_allocation/ConcurrentBoundedPriorityIterableTest.java b/gobblin-utility/src/test/java/org/apache/gobblin/util/request_allocation/ConcurrentBoundedPriorityIterableTest.java
index 58a56ea..2145bc0 100644
--- a/gobblin-utility/src/test/java/org/apache/gobblin/util/request_allocation/ConcurrentBoundedPriorityIterableTest.java
+++ b/gobblin-utility/src/test/java/org/apache/gobblin/util/request_allocation/ConcurrentBoundedPriorityIterableTest.java
@@ -32,10 +32,12 @@ public class ConcurrentBoundedPriorityIterableTest {
   public static final String MEMORY = "memory";
 
   @Test
-  public void test() throws Exception {
+  public void test()
+      throws Exception {
 
-    ConcurrentBoundedPriorityIterable<String> iterable = new ConcurrentBoundedPriorityIterable<>(new MyComparator(),
-        new MyEstimator(), ResourcePool.builder().maxResource(MEMORY, 100.).build());
+    ConcurrentBoundedPriorityIterable<String> iterable =
+        new ConcurrentBoundedPriorityIterable<>(new MyComparator(), new MyEstimator(), "min",
+            ResourcePool.builder().maxResource(MEMORY, 100.).build());
 
     // doesn't fit
     Assert.assertFalse(iterable.add("a-500"));
@@ -57,8 +59,8 @@ public class ConcurrentBoundedPriorityIterableTest {
     Assert.assertTrue(iterable.add("b-50"));
 
     // Check items
-    List<String> items = Lists.newArrayList(Iterators.transform(iterable.iterator(),
-        new AllocatedRequestsIteratorBase.TExtractor<String>()));
+    List<String> items = Lists
+        .newArrayList(Iterators.transform(iterable.iterator(), new AllocatedRequestsIteratorBase.TExtractor<String>()));
     Assert.assertEquals(items.size(), 2);
     Assert.assertEquals(items.get(0), "b-50");
     Assert.assertEquals(items.get(1), "d-50");
@@ -66,15 +68,15 @@ public class ConcurrentBoundedPriorityIterableTest {
     iterable.reopen();
     // a high priority that won't fit even with evictions should not evict anything
     Assert.assertFalse(iterable.add("c-500"));
-    items = Lists.newArrayList(Iterators.transform(iterable.iterator(),
-        new AllocatedRequestsIteratorBase.TExtractor<String>()));
+    items = Lists
+        .newArrayList(Iterators.transform(iterable.iterator(), new AllocatedRequestsIteratorBase.TExtractor<String>()));
     Assert.assertEquals(items.size(), 2);
 
     iterable.reopen();
     // even if it is higher priority than everything else
     Assert.assertFalse(iterable.add("a-500"));
-    items = Lists.newArrayList(Iterators.transform(iterable.iterator(),
-        new AllocatedRequestsIteratorBase.TExtractor<String>()));
+    items = Lists
+        .newArrayList(Iterators.transform(iterable.iterator(), new AllocatedRequestsIteratorBase.TExtractor<String>()));
     Assert.assertEquals(items.size(), 2);
   }
 
@@ -94,5 +96,4 @@ public class ConcurrentBoundedPriorityIterableTest {
       return resourcePool.getResourceRequirementBuilder().setRequirement(MEMORY, memory).build();
     }
   }
-
 }
\ No newline at end of file