You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@jclouds.apache.org by ga...@apache.org on 2014/03/26 21:48:41 UTC

git commit: JCLOUDS-510 Delete objects in a container efficiently.

Repository: jclouds
Updated Branches:
  refs/heads/master b93cfa42e -> 655aa444d


JCLOUDS-510 Delete objects in a container efficiently.

The existing approach for deleting objects in a container suffers
from a head-of-line blocking problem. This commit implements a better
scheme which does not have that problem. This scheme uses a counting
semaphore for making sure that a certain number of futures are
issued in parallel. As each of these futures is completed, one
permit of the semaphore is released.

Added unit tests for testing this new scheme.


Project: http://git-wip-us.apache.org/repos/asf/jclouds/repo
Commit: http://git-wip-us.apache.org/repos/asf/jclouds/commit/655aa444
Tree: http://git-wip-us.apache.org/repos/asf/jclouds/tree/655aa444
Diff: http://git-wip-us.apache.org/repos/asf/jclouds/diff/655aa444

Branch: refs/heads/master
Commit: 655aa444d71d8a24ac831fa4c3d365042c4a8ebb
Parents: b93cfa4
Author: Shri Javadekar <sh...@maginatics.com>
Authored: Sun Nov 24 23:35:57 2013 -0800
Committer: Andrew Gaul <ga...@apache.org>
Committed: Wed Mar 26 13:48:28 2014 -0700

----------------------------------------------------------------------
 .../strategy/internal/DeleteAllKeysInList.java  | 436 ++++++++++++++-----
 .../internal/DeleteAllKeysInListTest.java       | 149 ++++++-
 core/src/main/java/org/jclouds/Constants.java   |   5 +
 .../jclouds/apis/internal/BaseApiMetadata.java  |  11 +-
 4 files changed, 494 insertions(+), 107 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/jclouds/blob/655aa444/blobstore/src/main/java/org/jclouds/blobstore/strategy/internal/DeleteAllKeysInList.java
----------------------------------------------------------------------
diff --git a/blobstore/src/main/java/org/jclouds/blobstore/strategy/internal/DeleteAllKeysInList.java b/blobstore/src/main/java/org/jclouds/blobstore/strategy/internal/DeleteAllKeysInList.java
index 6a9b015..fe8aa4d 100644
--- a/blobstore/src/main/java/org/jclouds/blobstore/strategy/internal/DeleteAllKeysInList.java
+++ b/blobstore/src/main/java/org/jclouds/blobstore/strategy/internal/DeleteAllKeysInList.java
@@ -16,12 +16,16 @@
  */
 package org.jclouds.blobstore.strategy.internal;
 
-import static com.google.common.base.Throwables.propagate;
 import static org.jclouds.blobstore.options.ListContainerOptions.Builder.recursive;
-import static org.jclouds.concurrent.FutureIterables.awaitCompletion;
 
-import java.util.Map;
+import java.util.HashSet;
+import java.util.Collections;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import javax.annotation.Resource;
 import javax.inject.Named;
@@ -29,6 +33,7 @@ import javax.inject.Singleton;
 
 import org.jclouds.Constants;
 import org.jclouds.blobstore.BlobStore;
+import org.jclouds.blobstore.ContainerNotFoundException;
 import org.jclouds.blobstore.domain.PageSet;
 import org.jclouds.blobstore.domain.StorageMetadata;
 import org.jclouds.blobstore.internal.BlobRuntimeException;
@@ -39,15 +44,18 @@ import org.jclouds.blobstore.strategy.ClearListStrategy;
 import org.jclouds.http.handlers.BackoffLimitedRetryHandler;
 import org.jclouds.logging.Logger;
 
-import com.google.common.collect.Maps;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.ListeningExecutorService;
 import com.google.inject.Inject;
 
 /**
  * Deletes all keys in the container
- * 
+ *
  * @author Adrian Cole
+ * @author Shrinand Javadekar
  */
 @Singleton
 public class DeleteAllKeysInList implements ClearListStrategy, ClearContainerStrategy {
@@ -56,6 +64,7 @@ public class DeleteAllKeysInList implements ClearListStrategy, ClearContainerStr
    protected Logger logger = Logger.NULL;
 
    protected final BackoffLimitedRetryHandler retryHandler;
+
    private final ListeningExecutorService executorService;
 
    protected final BlobStore blobStore;
@@ -66,12 +75,17 @@ public class DeleteAllKeysInList implements ClearListStrategy, ClearContainerStr
    /** Maximum times to retry an operation. */
    protected int maxErrors = 3;
 
+   /** Maximum parallel deletes. */
+   private int maxParallelDeletes;
+
    @Inject
    DeleteAllKeysInList(@Named(Constants.PROPERTY_USER_THREADS) ListeningExecutorService executorService,
-         BlobStore blobStore, BackoffLimitedRetryHandler retryHandler) {
+         BlobStore blobStore, BackoffLimitedRetryHandler retryHandler,
+         @Named(Constants.PROPERTY_MAX_PARALLEL_DELETES) int maxParallelDeletes) {
       this.executorService = executorService;
       this.blobStore = blobStore;
       this.retryHandler = retryHandler;
+      this.maxParallelDeletes = maxParallelDeletes;
    }
 
    @Inject(optional = true)
@@ -88,121 +102,337 @@ public class DeleteAllKeysInList implements ClearListStrategy, ClearContainerStr
       execute(containerName, recursive());
    }
 
-   public void execute(final String containerName, ListContainerOptions options) {
-      String message = options.getDir() != null ? String.format("clearing path %s/%s",
-               containerName, options.getDir()) : String.format("clearing container %s",
-               containerName);
-      options = options.clone();
-      if (options.isRecursive())
-         message += " recursively";
-      logger.debug(message);
-      Map<StorageMetadata, Exception> exceptions = Maps.newHashMap();
-      for (int numErrors = 0; numErrors < maxErrors; ) {
-         // fetch partial directory listing
-         PageSet<? extends StorageMetadata> listing =
-               blobStore.list(containerName, options);
-
-         // recurse on subdirectories
-         if (options.isRecursive()) {
-            for (StorageMetadata md : listing) {
-               String fullPath = parentIsFolder(options, md) ? options.getDir() + "/"
-                        + md.getName() : md.getName();
-               switch (md.getType()) {
-                  case BLOB:
-                     break;
-                  case FOLDER:
-                  case RELATIVE_PATH:
-                     if (options.isRecursive() && !fullPath.equals(options.getDir())) {
-                        execute(containerName, options.clone().inDirectory(fullPath));
-                     }
-                     break;
-                  case CONTAINER:
-                     throw new IllegalArgumentException("Container type not supported");
+   private boolean parentIsFolder(final ListContainerOptions options,
+         final StorageMetadata md) {
+      return options.getDir() != null && md.getName().indexOf('/') == -1;
+   }
+
+   private void cancelOutstandingFutures(
+         final Set<ListenableFuture<Void>> outstandingFutures) {
+      for (ListenableFuture<Void> future : outstandingFutures) {
+         future.cancel(/*mayInterruptIfRunning=*/ true);
+      }
+   }
+
+   private String getMessage(final String containerName,
+         final ListContainerOptions options) {
+      return options.getDir() != null ? String.format("clearing path %s/%s",
+            containerName, options.getDir()) : String.format(
+            "clearing container %s", containerName);
+   }
+
+   /**
+    * Get the object listing from a given container based on the options. For
+    * recursive listing of directories, identify a directory and call execute()
+    * with the appropriate options to get listing inside the directory.
+    *
+    * @param containerName
+    *           The container from which to get the object list.
+    * @param options
+    *           The options used for getting the listing.
+    * @returns A PageSet of StorageMetadata objects.
+    */
+   private PageSet<? extends StorageMetadata> getListing(
+         final String containerName,
+         final ListContainerOptions options,
+         final Semaphore semaphore,
+         final Set<ListenableFuture<Void>> outstandingFutures,
+         final AtomicBoolean deleteFailure) {
+      // fetch partial directory listing
+      PageSet<? extends StorageMetadata> listing = null;
+
+      // There's nothing much to do if the container doesn't exist.
+      // Note that if the container has just been created, trying to get the
+      // container listing might throw a ContainerNotFoundException because of
+      // eventual consistency.
+      try {
+         listing = blobStore.list(containerName, options);
+      } catch (ContainerNotFoundException ce) {
+         return listing;
+      }
+
+      // recurse on subdirectories
+      if (options.isRecursive()) {
+         for (StorageMetadata md : listing) {
+            String fullPath = parentIsFolder(options, md) ? options.getDir()
+                  + "/" + md.getName() : md.getName();
+            switch (md.getType()) {
+            case BLOB:
+               break;
+            case FOLDER:
+            case RELATIVE_PATH:
+               if (!fullPath.equals(options.getDir())) {
+                  executeOneIteration(containerName,
+                     options.clone().inDirectory(fullPath), semaphore,
+                     outstandingFutures, deleteFailure, /*blocking=*/ true);
                }
+               break;
+            case CONTAINER:
+               throw new IllegalArgumentException(
+                  "Container type not supported");
             }
          }
+      }
 
-         // remove blobs and now-empty subdirectories
-         Map<StorageMetadata, ListenableFuture<?>> responses = Maps.newHashMap();
-         for (final StorageMetadata md : listing) {
-            final String fullPath = parentIsFolder(options, md) ? options.getDir() + "/"
-                     + md.getName() : md.getName();
-            switch (md.getType()) {
-               case BLOB:
-                  responses.put(md, executorService.submit(new Runnable() {
-                     @Override
-                     public void run() {
-                        blobStore.removeBlob(containerName, fullPath);
-                     }
-                  }));
-                  break;
-               case FOLDER:
-                  if (options.isRecursive()) {
-                     responses.put(md, executorService.submit(new Runnable() {
-                        @Override
-                        public void run() {
-                           blobStore.deleteDirectory(containerName, fullPath);
-                        }
-                     }));
-                  }
-                  break;
-               case RELATIVE_PATH:
-                  if (options.isRecursive()) {
-                     responses.put(md, executorService.submit(new Runnable() {
-                        @Override
-                        public void run() {
-                           blobStore.deleteDirectory(containerName, md.getName());
-                        }
-                     }));
-                  }
-                  break;
-               case CONTAINER:
-                  throw new IllegalArgumentException("Container type not supported");
+      return listing;
+   }
+
+   /**
+    * Delete the blobs from a given PageSet. The PageSet may contain blobs or
+    * directories. If there are directories, they are expected to be empty.
+    *
+    * The logic of acquiring a semaphore, submitting a callable to the
+    * executorService and releasing the semaphore resides here.
+    *
+    * @param containerName
+    *           The container from which the objects are listed.
+    * @param options
+    *           The options used for getting the container listing.
+    * @param listing
+    *           The actual list of objects.
+    * @param semaphore
+    *           The semaphore used for making sure that only a certain number of
+    *           futures are outstanding.
+    * @param deleteFailure
+    *           This is set to true if any future used for deleting blobs
+    *           failed.
+    * @param outstandingFutures
+    *           The List of outstanding futures.
+    * @throws TimeoutException
+    *            If any blob deletion takes too long.
+    */
+   private void deleteBlobsAndEmptyDirs(final String containerName,
+         ListContainerOptions options,
+         PageSet<? extends StorageMetadata> listing, final Semaphore semaphore,
+         final AtomicBoolean deleteFailure,
+         final Set<ListenableFuture<Void>> outstandingFutures)
+         throws TimeoutException {
+      for (final StorageMetadata md : listing) {
+         final String fullPath = parentIsFolder(options, md) ? options.getDir()
+               + "/" + md.getName() : md.getName();
+
+         // Attempt to acquire a semaphore within the time limit. At least
+         // one outstanding future should complete within this period for the
+         // semaphore to be acquired.
+         try {
+            if (!semaphore.tryAcquire(maxTime, TimeUnit.MILLISECONDS)) {
+               throw new TimeoutException("Timeout waiting for semaphore");
             }
+         } catch (InterruptedException ie) {
+            logger.debug("Interrupted while deleting blobs");
+            Thread.currentThread().interrupt();
          }
 
-         try {
-            exceptions = awaitCompletion(responses, executorService, maxTime, logger, message);
-         } catch (TimeoutException te) {
-            ++numErrors;
-            if (numErrors == maxErrors) {
-               throw propagate(te);
+         final ListenableFuture<Void> blobDelFuture;
+         switch (md.getType()) {
+         case BLOB:
+            blobDelFuture = executorService.submit(new Callable<Void>() {
+               @Override
+               public Void call() {
+                  blobStore.removeBlob(containerName, fullPath);
+                  return null;
+               }
+            });
+            break;
+         case FOLDER:
+            if (options.isRecursive()) {
+               blobDelFuture = executorService.submit(new Callable<Void>() {
+                  @Override
+                  public Void call() {
+                     blobStore.deleteDirectory(containerName, fullPath);
+                     return null;
+                  }
+               });
+            } else {
+               blobDelFuture = null;
             }
-            retryHandler.imposeBackoffExponentialDelay(numErrors, message);
-            continue;
-         } finally {
-            for (ListenableFuture<?> future : responses.values()) {
-               future.cancel(true);
+            break;
+         case RELATIVE_PATH:
+            if (options.isRecursive()) {
+               blobDelFuture = executorService.submit(new Callable<Void>() {
+                  @Override
+                  public Void call() {
+                     blobStore.deleteDirectory(containerName, md.getName());
+                     return null;
+                  }
+               });
+            } else {
+               blobDelFuture = null;
             }
+            break;
+         case CONTAINER:
+            throw new IllegalArgumentException("Container type not supported");
+         default:
+            blobDelFuture = null;
          }
 
-         if (!exceptions.isEmpty()) {
-            ++numErrors;
-            if (numErrors == maxErrors) {
-               break;
-            }
-            retryHandler.imposeBackoffExponentialDelay(numErrors, message);
-            continue;
+         // If a future to delete a blob/directory actually got created above,
+         // keep a reference of that in the outstandingFutures list. This is
+         // useful in case of a timeout exception. All outstanding futures can
+         // then be cancelled.
+         if (blobDelFuture != null) {
+            outstandingFutures.add(blobDelFuture);
+
+            // Add a callback to release the semaphore. This is required for
+            // other threads waiting to acquire a semaphore above to make
+            // progress.
+            Futures.addCallback(blobDelFuture, new FutureCallback<Object>() {
+               @Override
+               public void onSuccess(final Object o) {
+                  outstandingFutures.remove(blobDelFuture);
+                  semaphore.release();
+               }
+
+               @Override
+               public void onFailure(final Throwable t) {
+                  // Make a note the fact that some blob/directory could not be
+                  // deleted successfully. This is used for retrying later.
+                  deleteFailure.set(true);
+                  outstandingFutures.remove(blobDelFuture);
+                  semaphore.release();
+               }
+            });
+         } else {
+            // It is possible above to acquire a semaphore but not submit any
+            // task to the executorService. For e.g. if the listing contains
+            // an object of type 'FOLDER' and the ListContianerOptions are *not*
+            // recursive. In this case, there is no blobDelFuture and therefore
+            // no FutureCallback to release the semaphore. This semaphore is
+            // released here.
+            semaphore.release();
+         }
+      }
+   }
+
+   /**
+    * This method goes through all the blobs from a container and attempts to
+    * create futures for deleting them. If there is a TimeoutException when
+    * doing this, sets the deleteFailure flag to true and returns. If there are
+    * more retries left, this will get called again.
+    *
+    * @param containerName
+    *           The container from which to get the object list.
+    * @param listOptions
+    *           The options used for getting the listing.
+    * @param semaphore
+    *           The semaphore used for controlling number of outstanding
+    *           futures.
+    * @param outstandingFutures
+    *           A list of outstanding futures.
+    * @param deleteFailure
+    *           A flag used to track of whether there was a failure while
+    *           deleting any blob.
+    * @param blocking
+    *           when true, block until all outstanding operations have completed
+    * @return A PageSet of StorageMetadata objects.
+    */
+   @VisibleForTesting
+   void executeOneIteration(
+         final String containerName,
+         ListContainerOptions listOptions, final Semaphore semaphore,
+         final Set<ListenableFuture<Void>> outstandingFutures,
+         final AtomicBoolean deleteFailure, final boolean blocking) {
+      ListContainerOptions options = listOptions.clone();
+      String message = getMessage(containerName, listOptions);
+      if (options.isRecursive()) {
+         message += " recursively";
+      }
+      logger.debug(message);
+
+      PageSet<? extends StorageMetadata> listing = getListing(containerName,
+            options, semaphore, outstandingFutures, deleteFailure);
+      while (listing != null && !listing.isEmpty()) {
+         try {
+            // Remove blobs and now-empty subdirectories.
+            deleteBlobsAndEmptyDirs(containerName, options, listing, semaphore,
+                  deleteFailure, outstandingFutures);
+         } catch (TimeoutException te) {
+            logger.debug("TimeoutException while deleting blobs: {}",
+                  te.getMessage());
+            cancelOutstandingFutures(outstandingFutures);
+            deleteFailure.set(true);
          }
 
          String marker = listing.getNextMarker();
-         if (marker == null) {
+         if (marker != null) {
+            logger.debug("%s with marker %s", message, marker);
+            options = options.afterMarker(marker);
+            listing = getListing(containerName, options, semaphore,
+                  outstandingFutures, deleteFailure);
+         } else {
             break;
          }
-         logger.debug("%s with marker %s", message, marker);
-         options = options.afterMarker(marker);
-
-         // Reset numErrors if we execute a successful iteration.  This ensures
-         // that we only try an unsuccessful operation maxErrors times but
-         // allow progress with directories containing many blobs in the face
-         // of some failures.
-         numErrors = 0;
       }
-      if (!exceptions.isEmpty())
-         throw new BlobRuntimeException(String.format("error %s: %s", message, exceptions));
+
+      if (blocking) {
+         waitForCompletion(semaphore, outstandingFutures);
+      }
    }
 
-   private boolean parentIsFolder(final ListContainerOptions options, final StorageMetadata md) {
-      return options.getDir() != null && md.getName().indexOf('/') == -1;
+   private void waitForCompletion(final Semaphore semaphore,
+         final Set<ListenableFuture<Void>> outstandingFutures) {
+      // Wait for all futures to complete by waiting to acquire all
+      // semaphores.
+      try {
+         semaphore.acquire(maxParallelDeletes);
+         semaphore.release(maxParallelDeletes);
+      } catch (InterruptedException e) {
+         logger.debug("Interrupted while waiting for blobs to be deleted");
+         cancelOutstandingFutures(outstandingFutures);
+         Thread.currentThread().interrupt();
+      }
+   }
+
+   public void execute(final String containerName,
+         ListContainerOptions listOptions) {
+      final AtomicBoolean deleteFailure = new AtomicBoolean();
+      int retries = maxErrors;
+
+      /*
+       * A Semaphore is used to control the number of outstanding delete
+       * requests. One permit of the semaphore is acquired before submitting a
+       * request to the executorService to delete a blob. As requests complete,
+       * their FutureCallback will release the semaphore permit. That will allow
+       * the next delete request to proceed.
+       *
+       * If no Future completes in 'maxTime', i.e. a semaphore cannot be
+       * acquired in 'maxTime', a TimeoutException is thrown. Any outstanding
+       * futures at that time are cancelled.
+       */
+      final Semaphore semaphore = new Semaphore(maxParallelDeletes);
+      /*
+       * When a future is created, a reference for that is added to the
+       * outstandingFutures list. This reference is removed from the list in the
+       * FutureCallback since it no longer needs to be cancelled in the event of
+       * a timeout. Also, when the reference is removed from this list and when
+       * the executorService removes the reference that it has maintained, the
+       * future will be marked for GC since there should be no other references
+       * to it. This is important because this code can generate an unbounded
+       * number of futures.
+       */
+      final Set<ListenableFuture<Void>> outstandingFutures = Collections
+            .synchronizedSet(new HashSet<ListenableFuture<Void>>());
+      // TODO: Remove this retry loop.
+      while (retries > 0) {
+         deleteFailure.set(false);
+         executeOneIteration(containerName, listOptions.clone(), semaphore,
+               outstandingFutures, deleteFailure, /*blocking=*/ false);
+         waitForCompletion(semaphore, outstandingFutures);
+
+         // Try again if there was any failure while deleting blobs and the max
+         // retry count hasn't been reached.
+         if (deleteFailure.get() && --retries > 0) {
+            String message = getMessage(containerName, listOptions);
+            retryHandler.imposeBackoffExponentialDelay(maxErrors - retries,
+                  message);
+         } else {
+            break;
+         }
+      }
+
+      if (retries == 0) {
+         cancelOutstandingFutures(outstandingFutures);
+         throw new BlobRuntimeException("Exceeded maximum retry attempts");
+      }
    }
 }

http://git-wip-us.apache.org/repos/asf/jclouds/blob/655aa444/blobstore/src/test/java/org/jclouds/blobstore/strategy/internal/DeleteAllKeysInListTest.java
----------------------------------------------------------------------
diff --git a/blobstore/src/test/java/org/jclouds/blobstore/strategy/internal/DeleteAllKeysInListTest.java b/blobstore/src/test/java/org/jclouds/blobstore/strategy/internal/DeleteAllKeysInListTest.java
index 016fc23..10b74f3 100644
--- a/blobstore/src/test/java/org/jclouds/blobstore/strategy/internal/DeleteAllKeysInListTest.java
+++ b/blobstore/src/test/java/org/jclouds/blobstore/strategy/internal/DeleteAllKeysInListTest.java
@@ -16,35 +16,64 @@
  */
 package org.jclouds.blobstore.strategy.internal;
 
+import static org.easymock.EasyMock.createMock;
+import static org.easymock.EasyMock.createMockBuilder;
+import static org.easymock.EasyMock.createControl;
+import static org.easymock.EasyMock.isA;
+import static org.easymock.EasyMock.replay;
 import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
 
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 
+import org.easymock.EasyMock;
+import org.easymock.IMocksControl;
 import org.jclouds.ContextBuilder;
 import org.jclouds.blobstore.BlobStore;
+import org.jclouds.blobstore.ContainerNotFoundException;
 import org.jclouds.blobstore.options.ListContainerOptions;
 import org.jclouds.util.Closeables2;
+import org.jclouds.blobstore.domain.PageSet;
+import org.jclouds.blobstore.domain.StorageMetadata;
+import org.jclouds.blobstore.internal.BlobRuntimeException;
+import org.jclouds.http.handlers.BackoffLimitedRetryHandler;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
 import com.google.inject.Injector;
 
 /**
- * 
+ *
  * @author Adrian Cole
  */
 @Test(testName = "DeleteAllKeysInListTest", singleThreaded = true)
 public class DeleteAllKeysInListTest {
    private BlobStore blobstore;
    private DeleteAllKeysInList deleter;
+   private BackoffLimitedRetryHandler retryHandler;
    private static final String containerName = "container";
    private static final String directoryName = "directory";
+   private static final int maxParallelDeletes = 1024;
 
    @BeforeMethod
    void setupBlobStore() {
-      Injector injector = ContextBuilder.newBuilder("transient").buildInjector();
+      Injector injector = ContextBuilder.newBuilder("transient")
+            .buildInjector();
       blobstore = injector.getInstance(BlobStore.class);
       deleter = injector.getInstance(DeleteAllKeysInList.class);
+      retryHandler = injector.getInstance(BackoffLimitedRetryHandler.class);
       createDataSet();
    }
 
@@ -73,6 +102,122 @@ public class DeleteAllKeysInListTest {
       assertEquals(blobstore.countBlobs(containerName), 1111);
    }
 
+   public void testContainerNotFound() {
+      IMocksControl mockControl = createControl();
+      BlobStore blobStore = mockControl.createMock(BlobStore.class);
+      ListeningExecutorService executorService = mockControl
+            .createMock(ListeningExecutorService.class);
+      DeleteAllKeysInList testDeleter = createMockBuilder(
+            DeleteAllKeysInList.class).withConstructor(executorService,
+            blobStore, retryHandler, maxParallelDeletes).createMock();
+      EasyMock.<PageSet<? extends StorageMetadata>> expect(blobStore.list(
+                  isA(String.class), isA(ListContainerOptions.class)))
+            .andThrow(new ContainerNotFoundException()).once();
+      replay(blobStore);
+      testDeleter.execute(containerName,
+            ListContainerOptions.Builder.recursive());
+      // No blobs will be deleted since blobStore.list will throw a
+      // ContainerNotFoundException.
+      assertEquals(blobstore.countBlobs(containerName), 3333);
+   }
+
+   @SuppressWarnings("unchecked")
+   public void testDeleteAfterFutureFailure() {
+      IMocksControl mockControl = createControl();
+      ListeningExecutorService executorService = mockControl
+            .createMock(ListeningExecutorService.class);
+      DeleteAllKeysInList testDeleter = createMockBuilder(
+            DeleteAllKeysInList.class).withConstructor(executorService,
+            blobstore, retryHandler, maxParallelDeletes).createMock();
+      // Fail the first future that is created for deleting blobs.
+      EasyMock.<ListenableFuture<?>> expect(
+                  executorService.submit(isA(Callable.class)))
+            .andReturn(
+                  Futures.<Void> immediateFailedFuture(new RuntimeException()))
+            .once();
+      // There should be at least another 3333 calls to executorService.submit
+      // since there are 3333 blobs.
+      EasyMock.expectLastCall().andReturn(Futures.<Void> immediateFuture(null))
+            .times(3333, Integer.MAX_VALUE);
+      replay(executorService);
+      testDeleter.execute(containerName,
+            ListContainerOptions.Builder.recursive());
+   }
+
+   @SuppressWarnings("unchecked")
+   public void testExceptionThrownAfterMaxRetries() {
+      IMocksControl mockControl = createControl();
+      ListeningExecutorService executorService = mockControl
+            .createMock(ListeningExecutorService.class);
+      DeleteAllKeysInList testDeleter = createMockBuilder(
+            DeleteAllKeysInList.class).withConstructor(executorService,
+            blobstore, retryHandler, maxParallelDeletes).createMock();
+      // Fail the first future that is created for deleting blobs.
+      EasyMock.<ListenableFuture<?>> expect(
+                  executorService.submit(isA(Callable.class)))
+            .andReturn(
+                  Futures.<Void> immediateFailedFuture(new RuntimeException()))
+            .once();
+      EasyMock.expectLastCall().andReturn(Futures.<Void> immediateFuture(null))
+            .anyTimes();
+      replay(executorService);
+      testDeleter.setMaxErrors(1);
+
+      boolean blobRunTimeExceptionThrown = false;
+      try {
+      testDeleter.execute(containerName,
+            ListContainerOptions.Builder.recursive());
+      } catch (BlobRuntimeException be) {
+         blobRunTimeExceptionThrown = true;
+      }
+
+      assertTrue(blobRunTimeExceptionThrown, "Expected a BlobRunTimeException");
+   }
+
+   @SuppressWarnings("unchecked")
+   public void testFuturesCancelledOnFailure() {
+      IMocksControl mockControl = createControl();
+      ListeningExecutorService executorService = mockControl
+            .createMock(ListeningExecutorService.class);
+      DeleteAllKeysInList testDeleter = createMockBuilder(
+            DeleteAllKeysInList.class).withConstructor(executorService,
+            blobstore, retryHandler, maxParallelDeletes).createMock();
+      final AtomicBoolean deleteFailure = new AtomicBoolean();
+      final Semaphore semaphore = createMock(Semaphore.class);
+      final Set<ListenableFuture<Void>> outstandingFutures = Collections
+            .synchronizedSet(new HashSet<ListenableFuture<Void>>());
+      final ListenableFuture<Void> blobDelFuture = createMock(ListenableFuture.class);
+      try {
+
+         // Allow the first semaphore acquire to succeed.
+         EasyMock.expect(semaphore.tryAcquire(Long.MAX_VALUE,
+               TimeUnit.MILLISECONDS)).andReturn(true).once();
+         EasyMock.<ListenableFuture<?>> expect(
+                  executorService.submit(isA(Callable.class)))
+            .andReturn(blobDelFuture).once();
+
+         // Fail the second semaphore acquire.
+         EasyMock.expect(semaphore.tryAcquire(Long.MAX_VALUE,
+               TimeUnit.MILLISECONDS))
+               .andReturn(false).anyTimes();
+
+         blobDelFuture.addListener(isA(Runnable.class), isA(Executor.class));
+         EasyMock.expectLastCall();
+         EasyMock.expect(blobDelFuture.cancel(true)).andReturn(true)
+               .atLeastOnce();
+      } catch (InterruptedException e) {
+         fail();
+      }
+
+      replay(semaphore, executorService, blobDelFuture);
+      testDeleter.setMaxErrors(1);
+      testDeleter.executeOneIteration(containerName,
+            ListContainerOptions.Builder.recursive(), semaphore,
+            outstandingFutures, deleteFailure, /* blocking = */false);
+      assertEquals(outstandingFutures.size(), 1);
+      assertTrue(deleteFailure.get());
+   }
+
    /**
     * Create a container "container" with 1111 blobs named "blob-%d".  Create a
     * subdirectory "directory" which contains 2222 more blobs named

http://git-wip-us.apache.org/repos/asf/jclouds/blob/655aa444/core/src/main/java/org/jclouds/Constants.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/jclouds/Constants.java b/core/src/main/java/org/jclouds/Constants.java
index 2dc2349..45d77cf 100644
--- a/core/src/main/java/org/jclouds/Constants.java
+++ b/core/src/main/java/org/jclouds/Constants.java
@@ -299,4 +299,9 @@ public interface Constants {
     * providers that don't properly support Expect headers. Defaults to false.
     */
    public static final String PROPERTY_STRIP_EXPECT_HEADER = "jclouds.strip-expect-header";
+
+   /**
+    * The maximum number of blob deletes happening in parallel at any point in time.
+    */
+   public static final String PROPERTY_MAX_PARALLEL_DELETES = "jclouds.max-parallel-deletes";
 }

http://git-wip-us.apache.org/repos/asf/jclouds/blob/655aa444/core/src/main/java/org/jclouds/apis/internal/BaseApiMetadata.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/jclouds/apis/internal/BaseApiMetadata.java b/core/src/main/java/org/jclouds/apis/internal/BaseApiMetadata.java
index a814c61..f8bc285 100644
--- a/core/src/main/java/org/jclouds/apis/internal/BaseApiMetadata.java
+++ b/core/src/main/java/org/jclouds/apis/internal/BaseApiMetadata.java
@@ -24,6 +24,7 @@ import static org.jclouds.Constants.PROPERTY_ISO3166_CODES;
 import static org.jclouds.Constants.PROPERTY_MAX_CONNECTIONS_PER_CONTEXT;
 import static org.jclouds.Constants.PROPERTY_MAX_CONNECTIONS_PER_HOST;
 import static org.jclouds.Constants.PROPERTY_MAX_CONNECTION_REUSE;
+import static org.jclouds.Constants.PROPERTY_MAX_PARALLEL_DELETES;
 import static org.jclouds.Constants.PROPERTY_MAX_SESSION_FAILURES;
 import static org.jclouds.Constants.PROPERTY_PRETTY_PRINT_PAYLOADS;
 import static org.jclouds.Constants.PROPERTY_SCHEDULER_THREADS;
@@ -60,6 +61,8 @@ public abstract class BaseApiMetadata implements ApiMetadata {
    public static Properties defaultProperties() {
       Properties props = new Properties();
       // TODO: move this to ApiMetadata
+      final int numUserThreads = 50;
+
       props.setProperty(PROPERTY_ISO3166_CODES, "");
       props.setProperty(PROPERTY_MAX_CONNECTIONS_PER_CONTEXT, 20 + "");
       props.setProperty(PROPERTY_MAX_CONNECTIONS_PER_HOST, 0 + "");
@@ -67,16 +70,20 @@ public abstract class BaseApiMetadata implements ApiMetadata {
       props.setProperty(PROPERTY_CONNECTION_TIMEOUT, 60000 + "");
       props.setProperty(PROPERTY_IO_WORKER_THREADS, 20 + "");
       // Successfully tested 50 user threads with BlobStore.clearContainer.
-      props.setProperty(PROPERTY_USER_THREADS, 50 + "");
+      props.setProperty(PROPERTY_USER_THREADS, numUserThreads + "");
       props.setProperty(PROPERTY_SCHEDULER_THREADS, 10 + "");
       props.setProperty(PROPERTY_MAX_CONNECTION_REUSE, 75 + "");
       props.setProperty(PROPERTY_MAX_SESSION_FAILURES, 2 + "");
       props.setProperty(PROPERTY_SESSION_INTERVAL, 60 + "");
       props.setProperty(PROPERTY_PRETTY_PRINT_PAYLOADS, "true");
       props.setProperty(PROPERTY_STRIP_EXPECT_HEADER, "false");
+
+      // By default, we allow maximum parallel deletes to be equal to the number
+      // of user threads since one thread is used to delete on blob.
+      props.setProperty(PROPERTY_MAX_PARALLEL_DELETES, numUserThreads + "");
       return props;
    }
-   
+
    public abstract static class Builder<T extends Builder<T>> implements ApiMetadata.Builder<T> {
       protected abstract T self();