You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ka...@apache.org on 2017/09/28 07:21:36 UTC

[2/9] storm git commit: STORM-2084: Refactor localization to combine files together

http://git-wip-us.apache.org/repos/asf/storm/blob/78cb243c/storm-server/src/main/java/org/apache/storm/localizer/Localizer.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/localizer/Localizer.java b/storm-server/src/main/java/org/apache/storm/localizer/Localizer.java
deleted file mode 100644
index 353ab56..0000000
--- a/storm-server/src/main/java/org/apache/storm/localizer/Localizer.java
+++ /dev/null
@@ -1,695 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.localizer;
-
-import org.apache.storm.Config;
-import org.apache.storm.DaemonConfig;
-import org.apache.storm.blobstore.ClientBlobStore;
-import org.apache.storm.blobstore.InputStreamWithMeta;
-import org.apache.storm.generated.AuthorizationException;
-import org.apache.storm.generated.KeyNotFoundException;
-import org.apache.storm.utils.ConfigUtils;
-import org.apache.storm.utils.ServerUtils;
-import org.apache.storm.utils.ObjectReader;
-import org.apache.storm.utils.ShellUtils.ExitCodeException;
-import org.apache.storm.utils.ShellUtils.ShellCommandExecutor;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-import org.apache.commons.io.FileUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.BufferedWriter;
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.FileWriter;
-import java.io.FilenameFilter;
-import java.io.IOException;
-import java.io.PrintWriter;
-import java.nio.file.Files;
-import java.nio.file.Paths;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Map;
-import java.util.Collection;
-import java.util.ArrayList;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.RejectedExecutionException;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-
-import static java.nio.file.StandardCopyOption.ATOMIC_MOVE;
-
-/**
- * Class to download and manage files from the blobstore.  It uses an LRU cache
- * to determine which files to keep so they can be reused and which files to delete.
- */
-public class Localizer {
-  public static final Logger LOG = LoggerFactory.getLogger(Localizer.class);
-  public static final String FILECACHE = "filecache";
-  public static final String USERCACHE = "usercache";
-  // sub directories to store either files or uncompressed archives respectively
-  public static final String FILESDIR = "files";
-  public static final String ARCHIVESDIR = "archives";
-
-  private static final String TO_UNCOMPRESS = "_tmp_";
-  
-  
-  
-  private final Map<String, Object> _conf;
-  private final int _threadPoolSize;
-  // thread pool for initial download
-  private final ExecutorService _execService;
-  // thread pool for updates
-  private final ExecutorService _updateExecService;
-  private final int _blobDownloadRetries;
-
-  // track resources - user to resourceSet
-  private final ConcurrentMap<String, LocalizedResourceSet> _userRsrc = new
-      ConcurrentHashMap<String, LocalizedResourceSet>();
-
-  private final String _localBaseDir;
-
-  // cleanup
-  private long _cacheTargetSize;
-  private long _cacheCleanupPeriod;
-  private ScheduledExecutorService _cacheCleanupService;
-
-  public Localizer(Map<String, Object> conf, String baseDir) {
-    _conf = conf;
-    _localBaseDir = baseDir;
-    // default cache size 10GB, converted to Bytes
-    _cacheTargetSize = ObjectReader.getInt(_conf.get(DaemonConfig.SUPERVISOR_LOCALIZER_CACHE_TARGET_SIZE_MB),
-            10 * 1024).longValue() << 20;
-    // default 10 minutes.
-    _cacheCleanupPeriod = ObjectReader.getInt(_conf.get(
-            DaemonConfig.SUPERVISOR_LOCALIZER_CACHE_CLEANUP_INTERVAL_MS), 10 * 60 * 1000).longValue();
-
-    // if we needed we could make config for update thread pool size
-    _threadPoolSize = ObjectReader.getInt(_conf.get(DaemonConfig.SUPERVISOR_BLOBSTORE_DOWNLOAD_THREAD_COUNT), 5);
-    _blobDownloadRetries = ObjectReader.getInt(_conf.get(
-            DaemonConfig.SUPERVISOR_BLOBSTORE_DOWNLOAD_MAX_RETRIES), 3);
-
-    _execService = Executors.newFixedThreadPool(_threadPoolSize);
-    _updateExecService = Executors.newFixedThreadPool(_threadPoolSize);
-    reconstructLocalizedResources();
-  }
-
-  // For testing, it allows setting size in bytes
-  protected void setTargetCacheSize(long size) {
-    _cacheTargetSize = size;
-  }
-
-  // For testing, be careful as it doesn't clone
-  ConcurrentMap<String, LocalizedResourceSet> getUserResources() {
-    return _userRsrc;
-  }
-
-  public void startCleaner() {
-    _cacheCleanupService = new ScheduledThreadPoolExecutor(1,
-        new ThreadFactoryBuilder()
-            .setNameFormat("Localizer Cache Cleanup")
-            .build());
-
-    _cacheCleanupService.scheduleWithFixedDelay(new Runnable() {
-          @Override
-          public void run() {
-            handleCacheCleanup();
-          }
-        }, _cacheCleanupPeriod, _cacheCleanupPeriod, TimeUnit.MILLISECONDS);
-  }
-
-  public void shutdown() {
-    if (_cacheCleanupService != null) {
-      _cacheCleanupService.shutdown();
-    }
-    if (_execService != null) {
-      _execService.shutdown();
-    }
-    if (_updateExecService != null) {
-      _updateExecService.shutdown();
-    }
-  }
-
-  // baseDir/supervisor/usercache/
-  protected File getUserCacheDir() {
-    return new File(_localBaseDir, USERCACHE);
-  }
-
-  // baseDir/supervisor/usercache/user1/
-  protected File getLocalUserDir(String userName) {
-    return new File(getUserCacheDir(), userName);
-  }
-
-  // baseDir/supervisor/usercache/user1/filecache
-  public File getLocalUserFileCacheDir(String userName) {
-    return new File(getLocalUserDir(userName), FILECACHE);
-  }
-
-  // baseDir/supervisor/usercache/user1/filecache/files
-  protected File getCacheDirForFiles(File dir) {
-    return new File(dir, FILESDIR);
-  }
-
-  // get the directory to put uncompressed archives in
-  // baseDir/supervisor/usercache/user1/filecache/archives
-  protected File getCacheDirForArchives(File dir) {
-    return new File(dir, ARCHIVESDIR);
-  }
-
-  protected void addLocalizedResourceInDir(String dir, LocalizedResourceSet lrsrcSet,
-      boolean uncompress) {
-    File[] lrsrcs = readCurrentBlobs(dir);
-
-    if (lrsrcs != null) {
-      for (File rsrc : lrsrcs) {
-        LOG.info("add localized in dir found: " + rsrc);
-        /// strip off .suffix
-        String path = rsrc.getPath();
-        int p = path.lastIndexOf('.');
-        if (p > 0) {
-          path = path.substring(0, p);
-        }
-        LOG.debug("local file is: {} path is: {}", rsrc.getPath(), path);
-        LocalizedResource lrsrc = new LocalizedResource(new File(path).getName(), path,
-            uncompress);
-        lrsrcSet.add(lrsrc.getKey(), lrsrc, uncompress);
-      }
-    }
-  }
-
-  // Looks for files in the directory with .current suffix
-  protected File[] readCurrentBlobs(String location) {
-    File dir = new File(location);
-    File[] files = null;
-    if (dir.exists()) {
-      files = dir.listFiles(new FilenameFilter() {
-        @Override
-        public boolean accept(File dir, String name) {
-          return name.toLowerCase().endsWith(ServerUtils.DEFAULT_CURRENT_BLOB_SUFFIX);
-        }
-      });
-    }
-    return files;
-  }
-
-  // Check to see if there are any existing files already localized.
-  protected void reconstructLocalizedResources() {
-    try {
-      LOG.info("Reconstruct localized resource: " + getUserCacheDir().getPath());
-      Collection<File> users = ConfigUtils.readDirFiles(getUserCacheDir().getPath());
-      if (!(users == null || users.isEmpty())) {
-        for (File userDir : users) {
-          String user = userDir.getName();
-          LOG.debug("looking in: {} for user: {}", userDir.getPath(), user);
-          LocalizedResourceSet newSet = new LocalizedResourceSet(user);
-          LocalizedResourceSet lrsrcSet = _userRsrc.putIfAbsent(user, newSet);
-          if (lrsrcSet == null) {
-            lrsrcSet = newSet;
-          }
-          addLocalizedResourceInDir(getCacheDirForFiles(getLocalUserFileCacheDir(user)).getPath(),
-              lrsrcSet, false);
-          addLocalizedResourceInDir(
-              getCacheDirForArchives(getLocalUserFileCacheDir(user)).getPath(),
-              lrsrcSet, true);
-        }
-      } else {
-        LOG.warn("No left over resources found for any user during reconstructing of local resources at: {}", getUserCacheDir().getPath());
-      }
-    } catch (Exception e) {
-      LOG.error("ERROR reconstructing localized resources", e);
-    }
-  }
-
-  // ignores invalid user/topo/key
-  public synchronized void removeBlobReference(String key, String user, String topo,
-      boolean uncompress) throws AuthorizationException, KeyNotFoundException {
-    LocalizedResourceSet lrsrcSet = _userRsrc.get(user);
-    if (lrsrcSet != null) {
-      LocalizedResource lrsrc = lrsrcSet.get(key, uncompress);
-      if (lrsrc != null) {
-        LOG.debug("removing blob reference to: {} for topo: {}", key, topo);
-        lrsrc.removeReference(topo);
-      } else {
-        LOG.warn("trying to remove non-existent blob, key: " + key + " for user: " + user +
-            " topo: " + topo);
-      }
-    } else {
-      LOG.warn("trying to remove blob for non-existent resource set for user: " + user + " key: "
-          + key + " topo: " + topo);
-    }
-  }
-
-  public synchronized void addReferences(List<LocalResource> localresource, String user,
-       String topo) {
-    LocalizedResourceSet lrsrcSet = _userRsrc.get(user);
-    if (lrsrcSet != null) {
-      for (LocalResource blob : localresource) {
-        LocalizedResource lrsrc = lrsrcSet.get(blob.getBlobName(), blob.shouldUncompress());
-        if (lrsrc != null) {
-          lrsrc.addReference(topo);
-          LOG.debug("added reference for topo: {} key: {}", topo, blob);
-        } else {
-          LOG.warn("trying to add reference to non-existent blob, key: " + blob + " topo: " + topo);
-        }
-      }
-    } else {
-      LOG.warn("trying to add reference to non-existent local resource set, " +
-          "user: " + user + " topo: " + topo);
-    }
-  }
-
-  /**
-   * This function either returns the blob in the existing cache or if it doesn't exist in the
-   * cache, it will download the blob and will block until the download is complete.
-   */
-  public LocalizedResource getBlob(LocalResource localResource, String user, String topo,
-       File userFileDir) throws AuthorizationException, KeyNotFoundException, IOException {
-    ArrayList<LocalResource> arr = new ArrayList<LocalResource>();
-    arr.add(localResource);
-    List<LocalizedResource> results = getBlobs(arr, user, topo, userFileDir);
-    if (results.isEmpty() || results.size() != 1) {
-      throw new IOException("Unknown error getting blob: " + localResource + ", for user: " + user +
-          ", topo: " + topo);
-    }
-    return results.get(0);
-  }
-
-  protected boolean isLocalizedResourceDownloaded(LocalizedResource lrsrc) {
-    File rsrcFileCurrent = new File(lrsrc.getCurrentSymlinkPath());
-    File rsrcFileWithVersion = new File(lrsrc.getFilePathWithVersion());
-    File versionFile = new File(lrsrc.getVersionFilePath());
-    return (rsrcFileWithVersion.exists() && rsrcFileCurrent.exists() && versionFile.exists());
-  }
-
-  protected boolean isLocalizedResourceUpToDate(LocalizedResource lrsrc,
-      ClientBlobStore blobstore) throws AuthorizationException, KeyNotFoundException {
-    String localFile = lrsrc.getFilePath();
-    long nimbusBlobVersion = ServerUtils.nimbusVersionOfBlob(lrsrc.getKey(), blobstore);
-    long currentBlobVersion = ServerUtils.localVersionOfBlob(localFile);
-    return (nimbusBlobVersion == currentBlobVersion);
-  }
-
-  protected ClientBlobStore getClientBlobStore() {
-    return ServerUtils.getClientBlobStoreForSupervisor(_conf);
-  }
-
-  /**
-   * This function updates blobs on the supervisor. It uses a separate thread pool and runs
-   * asynchronously of the download and delete.
-   */
-  public List<LocalizedResource> updateBlobs(List<LocalResource> localResources,
-       String user) throws AuthorizationException, KeyNotFoundException, IOException {
-    LocalizedResourceSet lrsrcSet = _userRsrc.get(user);
-    ArrayList<LocalizedResource> results = new ArrayList<>();
-    ArrayList<Callable<LocalizedResource>> updates = new ArrayList<>();
-
-    if (lrsrcSet == null) {
-      // resource set must have been removed
-      return results;
-    }
-    ClientBlobStore blobstore = null;
-    try {
-      blobstore = getClientBlobStore();
-      for (LocalResource localResource: localResources) {
-        String key = localResource.getBlobName();
-        LocalizedResource lrsrc = lrsrcSet.get(key, localResource.shouldUncompress());
-        if (lrsrc == null) {
-          LOG.warn("blob requested for update doesn't exist: {}", key);
-          continue;
-        } else if ((boolean)_conf.getOrDefault(Config.DISABLE_SYMLINKS, false)) {
-          LOG.warn("symlinks are disabled so blobs cannot be downloaded.");
-          continue;
-        } else {
-          // update it if either the version isn't the latest or if any local blob files are missing
-          if (!isLocalizedResourceUpToDate(lrsrc, blobstore) ||
-              !isLocalizedResourceDownloaded(lrsrc)) {
-            LOG.debug("updating blob: {}", key);
-            updates.add(new DownloadBlob(this, _conf, key, new File(lrsrc.getFilePath()), user,
-                lrsrc.isUncompressed(), true));
-          }
-        }
-      }
-    } finally {
-      if(blobstore != null) {
-        blobstore.shutdown();
-      }
-    }
-    try {
-      List<Future<LocalizedResource>> futures = _updateExecService.invokeAll(updates);
-      for (Future<LocalizedResource> futureRsrc : futures) {
-        try {
-          LocalizedResource lrsrc = futureRsrc.get();
-          // put the resource just in case it was removed at same time by the cleaner
-          LocalizedResourceSet newSet = new LocalizedResourceSet(user);
-          LocalizedResourceSet newlrsrcSet = _userRsrc.putIfAbsent(user, newSet);
-          if (newlrsrcSet == null) {
-            newlrsrcSet = newSet;
-          }
-          newlrsrcSet.putIfAbsent(lrsrc.getKey(), lrsrc, lrsrc.isUncompressed());
-          results.add(lrsrc);
-        }
-        catch (ExecutionException e) {
-          LOG.error("Error updating blob: ", e);
-          if (e.getCause() instanceof AuthorizationException) {
-            throw (AuthorizationException)e.getCause();
-          }
-          if (e.getCause() instanceof KeyNotFoundException) {
-            throw (KeyNotFoundException)e.getCause();
-          }
-        }
-      }
-    } catch (RejectedExecutionException re) {
-      LOG.error("Error updating blobs : ", re);
-    } catch (InterruptedException ie) {
-      throw new IOException("Interrupted Exception", ie);
-    }
-    return results;
-  }
-
-  /**
-   * This function either returns the blobs in the existing cache or if they don't exist in the
-   * cache, it downloads them in parallel (up to SUPERVISOR_BLOBSTORE_DOWNLOAD_THREAD_COUNT)
-   * and will block until all of them have been downloaded
-   */
-  public synchronized List<LocalizedResource> getBlobs(List<LocalResource> localResources,
-      String user, String topo, File userFileDir)
-      throws AuthorizationException, KeyNotFoundException, IOException {
-    if ((boolean)_conf.getOrDefault(Config.DISABLE_SYMLINKS, false)) {
-      throw new KeyNotFoundException("symlinks are disabled so blobs cannot be downloaded.");
-    }
-    LocalizedResourceSet newSet = new LocalizedResourceSet(user);
-    LocalizedResourceSet lrsrcSet = _userRsrc.putIfAbsent(user, newSet);
-    if (lrsrcSet == null) {
-      lrsrcSet = newSet;
-    }
-    ArrayList<LocalizedResource> results = new ArrayList<>();
-    ArrayList<Callable<LocalizedResource>> downloads = new ArrayList<>();
-
-    ClientBlobStore blobstore = null;
-    try {
-      blobstore = getClientBlobStore();
-      for (LocalResource localResource: localResources) {
-        String key = localResource.getBlobName();
-        boolean uncompress = localResource.shouldUncompress();
-        LocalizedResource lrsrc = lrsrcSet.get(key, localResource.shouldUncompress());
-        boolean isUpdate = false;
-        if ((lrsrc != null) && (lrsrc.isUncompressed() == localResource.shouldUncompress()) &&
-            (isLocalizedResourceDownloaded(lrsrc))) {
-          if (isLocalizedResourceUpToDate(lrsrc, blobstore)) {
-            LOG.debug("blob already exists: {}", key);
-            lrsrc.addReference(topo);
-            results.add(lrsrc);
-            continue;
-          }
-          LOG.debug("blob exists but isn't up to date: {}", key);
-          isUpdate = true;
-        }
-
-        // go off to blobstore and get it
-        // assume dir passed in exists and has correct permission
-        LOG.debug("fetching blob: {}", key);
-        File downloadDir = getCacheDirForFiles(userFileDir);
-        File localFile = new File(downloadDir, key);
-        if (uncompress) {
-          // for compressed file, download to archives dir
-          downloadDir = getCacheDirForArchives(userFileDir);
-          localFile = new File(downloadDir, key);
-        }
-        downloadDir.mkdir();
-        downloads.add(new DownloadBlob(this, _conf, key, localFile, user, uncompress,
-            isUpdate));
-      }
-    } finally {
-      if(blobstore !=null) {
-        blobstore.shutdown();
-      }
-    }
-    try {
-      List<Future<LocalizedResource>> futures = _execService.invokeAll(downloads);
-      for (Future<LocalizedResource> futureRsrc: futures) {
-        LocalizedResource lrsrc = futureRsrc.get();
-        lrsrc.addReference(topo);
-        lrsrcSet.add(lrsrc.getKey(), lrsrc, lrsrc.isUncompressed());
-        results.add(lrsrc);
-      }
-    } catch (ExecutionException e) {
-      if (e.getCause() instanceof AuthorizationException)
-        throw (AuthorizationException)e.getCause();
-      else if (e.getCause() instanceof KeyNotFoundException) {
-        throw (KeyNotFoundException)e.getCause();
-      } else {
-        throw new IOException("Error getting blobs", e);
-      }
-    } catch (RejectedExecutionException re) {
-      throw new IOException("RejectedExecutionException: ", re);
-    } catch (InterruptedException ie) {
-      throw new IOException("Interrupted Exception", ie);
-    }
-    return results;
-  }
-
-  static class DownloadBlob implements Callable<LocalizedResource> {
-
-    private Localizer _localizer;
-    private Map _conf;
-    private String _key;
-    private File _localFile;
-    private String _user;
-    private boolean _uncompress;
-    private boolean _isUpdate;
-
-    public DownloadBlob(Localizer localizer, Map<String, Object> conf, String key, File localFile,
-        String user, boolean uncompress, boolean update) {
-      _localizer = localizer;
-      _conf = conf;
-      _key = key;
-      _localFile = localFile;
-      _user = user;
-      _uncompress = uncompress;
-      _isUpdate = update;
-    }
-
-    @Override
-    public LocalizedResource call()
-        throws AuthorizationException, KeyNotFoundException, IOException  {
-      return _localizer.downloadBlob(_conf, _key, _localFile, _user, _uncompress,
-        _isUpdate);
-    }
-  }
-
-  private LocalizedResource downloadBlob(Map<String, Object> conf, String key, File localFile,
-      String user, boolean uncompress, boolean isUpdate)
-      throws AuthorizationException, KeyNotFoundException, IOException {
-    ClientBlobStore blobstore = null;
-    try {
-      blobstore = getClientBlobStore();
-      long nimbusBlobVersion = ServerUtils.nimbusVersionOfBlob(key, blobstore);
-      long oldVersion = ServerUtils.localVersionOfBlob(localFile.toString());
-      FileOutputStream out = null;
-      PrintWriter writer = null;
-      int numTries = 0;
-      String localizedPath = localFile.toString();
-      String localFileWithVersion = ServerUtils.constructBlobWithVersionFileName(localFile.toString(),
-              nimbusBlobVersion);
-      String localVersionFile = ServerUtils.constructVersionFileName(localFile.toString());
-      String downloadFile = localFileWithVersion;
-      if (uncompress) {
-        // we need to download to temp file and then unpack into the one requested
-        downloadFile = new File(localFile.getParent(), TO_UNCOMPRESS + localFile.getName()).toString();
-      }
-      while (numTries < _blobDownloadRetries) {
-        out = new FileOutputStream(downloadFile);
-        numTries++;
-        try {
-          if (!ServerUtils.canUserReadBlob(blobstore.getBlobMeta(key), user, conf)) {
-            throw new AuthorizationException(user + " does not have READ access to " + key);
-          }
-          InputStreamWithMeta in = blobstore.getBlob(key);
-          byte[] buffer = new byte[1024];
-          int len;
-          while ((len = in.read(buffer)) >= 0) {
-            out.write(buffer, 0, len);
-          }
-          out.close();
-          in.close();
-          if (uncompress) {
-            ServerUtils.unpack(new File(downloadFile), new File(localFileWithVersion));
-            LOG.debug("uncompressed " + downloadFile + " to: " + localFileWithVersion);
-          }
-
-          // Next write the version.
-          LOG.info("Blob: " + key + " updated with new Nimbus-provided version: " +
-              nimbusBlobVersion + " local version was: " + oldVersion);
-          // The false parameter ensures overwriting the version file, not appending
-          writer = new PrintWriter(
-              new BufferedWriter(new FileWriter(localVersionFile, false)));
-          writer.println(nimbusBlobVersion);
-          writer.close();
-
-          try {
-            setBlobPermissions(conf, user, localFileWithVersion);
-            setBlobPermissions(conf, user, localVersionFile);
-
-            // Update the key.current symlink. First create tmp symlink and do
-            // move of tmp to current so that the operation is atomic.
-            String tmp_uuid_local = java.util.UUID.randomUUID().toString();
-            LOG.debug("Creating a symlink @" + localFile + "." + tmp_uuid_local + " , " +
-                "linking to: " + localFile + "." + nimbusBlobVersion);
-            File uuid_symlink = new File(localFile + "." + tmp_uuid_local);
-
-            Files.createSymbolicLink(uuid_symlink.toPath(),
-                Paths.get(ServerUtils.constructBlobWithVersionFileName(localFile.toString(),
-                        nimbusBlobVersion)));
-            File current_symlink = new File(ServerUtils.constructBlobCurrentSymlinkName(
-                    localFile.toString()));
-            Files.move(uuid_symlink.toPath(), current_symlink.toPath(), ATOMIC_MOVE);
-          } catch (IOException e) {
-            // if we fail after writing the version file but before we move current link we need to
-            // restore the old version to the file
-            try {
-              PrintWriter restoreWriter = new PrintWriter(
-                  new BufferedWriter(new FileWriter(localVersionFile, false)));
-              restoreWriter.println(oldVersion);
-              restoreWriter.close();
-            } catch (IOException ignore) {}
-            throw e;
-          }
-
-          String oldBlobFile = localFile + "." + oldVersion;
-          try {
-            // Remove the old version. Note that if a number of processes have that file open,
-            // the OS will keep the old blob file around until they all close the handle and only
-            // then deletes it. No new process will open the old blob, since the users will open the
-            // blob through the "blob.current" symlink, which always points to the latest version of
-            // a blob. Remove the old version after the current symlink is updated as to not affect
-            // anyone trying to read it.
-            if ((oldVersion != -1) && (oldVersion != nimbusBlobVersion)) {
-              LOG.info("Removing an old blob file:" + oldBlobFile);
-              Files.delete(Paths.get(oldBlobFile));
-            }
-          } catch (IOException e) {
-            // At this point we have downloaded everything and moved symlinks.  If the remove of
-            // old fails just log an error
-            LOG.error("Exception removing old blob version: " + oldBlobFile);
-          }
-
-          break;
-        } catch (AuthorizationException ae) {
-          // we consider this non-retriable exceptions
-          if (out != null) {
-            out.close();
-          }
-          new File(downloadFile).delete();
-          throw ae;
-        } catch (IOException | KeyNotFoundException e) {
-          if (out != null) {
-            out.close();
-          }
-          if (writer != null) {
-            writer.close();
-          }
-          new File(downloadFile).delete();
-          if (uncompress) {
-            try {
-              FileUtils.deleteDirectory(new File(localFileWithVersion));
-            } catch (IOException ignore) {}
-          }
-          if (!isUpdate) {
-            // don't want to remove existing version file if its an update
-            new File(localVersionFile).delete();
-          }
-
-          if (numTries < _blobDownloadRetries) {
-            LOG.error("Failed to download blob, retrying", e);
-          } else {
-            throw e;
-          }
-        }
-      }
-      return new LocalizedResource(key, localizedPath, uncompress);
-    } finally {
-      if(blobstore != null) {
-        blobstore.shutdown();
-      }
-    }
-  }
-
-  public void setBlobPermissions(Map<String, Object> conf, String user, String path)
-      throws IOException {
-
-    if (!ObjectReader.getBoolean(conf.get(Config.SUPERVISOR_RUN_WORKER_AS_USER), false)) {
-      return;
-    }
-    String wlCommand = ObjectReader.getString(conf.get(Config.SUPERVISOR_WORKER_LAUNCHER), "");
-    if (wlCommand.isEmpty()) {
-      String stormHome = System.getProperty("storm.home");
-      wlCommand = stormHome + "/bin/worker-launcher";
-    }
-    List<String> command = new ArrayList<String>(Arrays.asList(wlCommand, user, "blob", path));
-
-    String[] commandArray = command.toArray(new String[command.size()]);
-    ShellCommandExecutor shExec = new ShellCommandExecutor(commandArray);
-    LOG.info("Setting blob permissions, command: {}", Arrays.toString(commandArray));
-
-    try {
-      shExec.execute();
-      LOG.debug("output: {}", shExec.getOutput());
-    } catch (ExitCodeException e) {
-      int exitCode = shExec.getExitCode();
-      LOG.warn("Exit code from worker-launcher is : " + exitCode, e);
-      LOG.debug("output: {}", shExec.getOutput());
-      throw new IOException("Setting blob permissions failed" +
-          " (exitCode=" + exitCode + ") with output: " + shExec.getOutput(), e);
-    }
-  }
-
-
-  public synchronized void handleCacheCleanup() {
-    LocalizedResourceRetentionSet toClean = new LocalizedResourceRetentionSet(_cacheTargetSize);
-    // need one large set of all and then clean via LRU
-    for (LocalizedResourceSet t : _userRsrc.values()) {
-      toClean.addResources(t);
-      LOG.debug("Resources to be cleaned after adding {} : {}", t.getUser(), toClean);
-    }
-    toClean.cleanup();
-    LOG.debug("Resource cleanup: {}", toClean);
-    for (LocalizedResourceSet t : _userRsrc.values()) {
-      if (t.getSize() == 0) {
-        String user = t.getUser();
-
-        LOG.debug("removing empty set: {}", user);
-        File userFileCacheDir = getLocalUserFileCacheDir(user);
-        getCacheDirForFiles(userFileCacheDir).delete();
-        getCacheDirForArchives(userFileCacheDir).delete();
-        getLocalUserFileCacheDir(user).delete();
-        boolean dirsRemoved = getLocalUserDir(user).delete();
-        // to catch race with update thread
-        if (dirsRemoved) {
-          _userRsrc.remove(user);
-        }
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/78cb243c/storm-server/src/main/java/org/apache/storm/utils/ServerUtils.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/utils/ServerUtils.java b/storm-server/src/main/java/org/apache/storm/utils/ServerUtils.java
index 311acda..7b127d2 100644
--- a/storm-server/src/main/java/org/apache/storm/utils/ServerUtils.java
+++ b/storm-server/src/main/java/org/apache/storm/utils/ServerUtils.java
@@ -33,23 +33,26 @@ import org.apache.storm.blobstore.ClientBlobStore;
 import org.apache.storm.blobstore.InputStreamWithMeta;
 import org.apache.storm.blobstore.LocalFsBlobStore;
 import org.apache.storm.daemon.StormCommon;
-import org.apache.storm.generated.*;
-import org.apache.storm.localizer.Localizer;
+import org.apache.storm.generated.AccessControl;
+import org.apache.storm.generated.AccessControlType;
+import org.apache.storm.generated.AuthorizationException;
+import org.apache.storm.generated.InvalidTopologyException;
+import org.apache.storm.generated.KeyNotFoundException;
+import org.apache.storm.generated.ReadableBlobMeta;
+import org.apache.storm.generated.SettableBlobMeta;
+import org.apache.storm.generated.StormTopology;
 import org.apache.storm.nimbus.NimbusInfo;
 import org.apache.storm.scheduler.resource.ResourceUtils;
 import org.apache.thrift.TException;
-import org.apache.storm.utils.Utils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.BufferedInputStream;
 import java.io.BufferedOutputStream;
-import java.io.BufferedReader;
 import java.io.BufferedWriter;
 import java.io.File;
 import java.io.FileInputStream;
 import java.io.FileOutputStream;
-import java.io.FileReader;
 import java.io.FileWriter;
 import java.io.IOException;
 import java.io.InputStream;
@@ -61,7 +64,6 @@ import java.nio.file.Files;
 import java.nio.file.Paths;
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Collection;
 import java.util.Collections;
 import java.util.Enumeration;
 import java.util.HashMap;
@@ -285,10 +287,6 @@ public class ServerUtils {
         return Files.getOwner(FileSystems.getDefault().getPath(path)).getName();
     }
 
-    public static Localizer createLocalizer(Map<String, Object> conf, String baseDir) {
-        return new Localizer(conf, baseDir);
-    }
-
     public static String containerFilePath (String dir) {
         return dir + FILE_PATH_SEPARATOR + "launch_container.sh";
     }

http://git-wip-us.apache.org/repos/asf/storm/blob/78cb243c/storm-server/src/test/java/org/apache/storm/daemon/supervisor/SlotTest.java
----------------------------------------------------------------------
diff --git a/storm-server/src/test/java/org/apache/storm/daemon/supervisor/SlotTest.java b/storm-server/src/test/java/org/apache/storm/daemon/supervisor/SlotTest.java
index eb25566..60b628e 100644
--- a/storm-server/src/test/java/org/apache/storm/daemon/supervisor/SlotTest.java
+++ b/storm-server/src/test/java/org/apache/storm/daemon/supervisor/SlotTest.java
@@ -25,7 +25,7 @@ import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
-import java.util.concurrent.Future;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.storm.daemon.supervisor.Slot.StaticState;
@@ -39,7 +39,7 @@ import org.apache.storm.generated.NodeInfo;
 import org.apache.storm.generated.ProfileAction;
 import org.apache.storm.generated.ProfileRequest;
 import org.apache.storm.generated.WorkerResources;
-import org.apache.storm.localizer.ILocalizer;
+import org.apache.storm.localizer.AsyncLocalizer;
 import org.apache.storm.scheduler.ISupervisor;
 import org.apache.storm.utils.LocalState;
 import org.apache.storm.utils.Time;
@@ -115,7 +115,7 @@ public class SlotTest {
     @Test
     public void testEmptyToEmpty() throws Exception {
         try (SimulatedTime t = new SimulatedTime(1010)){
-            ILocalizer localizer = mock(ILocalizer.class);
+            AsyncLocalizer localizer = mock(AsyncLocalizer.class);
             LocalState state = mock(LocalState.class);
             ContainerLauncher containerLauncher = mock(ContainerLauncher.class);
             ISupervisor iSuper = mock(ISupervisor.class);
@@ -137,7 +137,7 @@ public class SlotTest {
             LocalAssignment newAssignment = 
                     mkLocalAssignment(topoId, execList, mkWorkerResources(100.0, 100.0, 100.0));
             
-            ILocalizer localizer = mock(ILocalizer.class);
+            AsyncLocalizer localizer = mock(AsyncLocalizer.class);
             Container container = mock(Container.class);
             LocalState state = mock(LocalState.class);
             ContainerLauncher containerLauncher = mock(ContainerLauncher.class);
@@ -146,11 +146,11 @@ public class SlotTest {
             when(container.readHeartbeat()).thenReturn(hb, hb);
             
             @SuppressWarnings("unchecked")
-            Future<Void> baseFuture = mock(Future.class);
+            CompletableFuture<Void> baseFuture = mock(CompletableFuture.class);
             when(localizer.requestDownloadBaseTopologyBlobs(newAssignment, port)).thenReturn(baseFuture);
             
             @SuppressWarnings("unchecked")
-            Future<Void> blobFuture = mock(Future.class);
+            CompletableFuture<Void> blobFuture = mock(CompletableFuture.class);
             when(localizer.requestDownloadTopologyBlobs(newAssignment, port)).thenReturn(blobFuture);
             
             ISupervisor iSuper = mock(ISupervisor.class);
@@ -220,7 +220,7 @@ public class SlotTest {
             LocalAssignment assignment = 
                     mkLocalAssignment(topoId, execList, mkWorkerResources(100.0, 100.0, 100.0));
             
-            ILocalizer localizer = mock(ILocalizer.class);
+            AsyncLocalizer localizer = mock(AsyncLocalizer.class);
             Container container = mock(Container.class);
             ContainerLauncher containerLauncher = mock(ContainerLauncher.class);
             LSWorkerHeartbeat oldhb = mkWorkerHB(topoId, port, execList, Time.currentTimeSecs()-10);
@@ -276,7 +276,7 @@ public class SlotTest {
             LocalAssignment nAssignment = 
                     mkLocalAssignment(nTopoId, nExecList, mkWorkerResources(100.0, 100.0, 100.0));
             
-            ILocalizer localizer = mock(ILocalizer.class);
+            AsyncLocalizer localizer = mock(AsyncLocalizer.class);
             Container nContainer = mock(Container.class);
             LocalState state = mock(LocalState.class);
             ContainerLauncher containerLauncher = mock(ContainerLauncher.class);
@@ -285,11 +285,11 @@ public class SlotTest {
             when(nContainer.readHeartbeat()).thenReturn(nhb, nhb);
             
             @SuppressWarnings("unchecked")
-            Future<Void> baseFuture = mock(Future.class);
+            CompletableFuture<Void> baseFuture = mock(CompletableFuture.class);
             when(localizer.requestDownloadBaseTopologyBlobs(nAssignment, port)).thenReturn(baseFuture);
             
             @SuppressWarnings("unchecked")
-            Future<Void> blobFuture = mock(Future.class);
+            CompletableFuture<Void> blobFuture = mock(CompletableFuture.class);
             when(localizer.requestDownloadTopologyBlobs(nAssignment, port)).thenReturn(blobFuture);
             
             ISupervisor iSuper = mock(ISupervisor.class);
@@ -377,7 +377,7 @@ public class SlotTest {
             when(cContainer.readHeartbeat()).thenReturn(chb);
             when(cContainer.areAllProcessesDead()).thenReturn(false, true);
             
-            ILocalizer localizer = mock(ILocalizer.class);
+            AsyncLocalizer localizer = mock(AsyncLocalizer.class);
             ContainerLauncher containerLauncher = mock(ContainerLauncher.class);
             
             ISupervisor iSuper = mock(ISupervisor.class);
@@ -437,7 +437,7 @@ public class SlotTest {
             when(cContainer.readHeartbeat()).thenReturn(chb, chb, chb, chb, chb, chb);
             when(cContainer.runProfiling(any(ProfileRequest.class), anyBoolean())).thenReturn(true);
             
-            ILocalizer localizer = mock(ILocalizer.class);
+            AsyncLocalizer localizer = mock(AsyncLocalizer.class);
             ContainerLauncher containerLauncher = mock(ContainerLauncher.class);
             
             ISupervisor iSuper = mock(ISupervisor.class);

http://git-wip-us.apache.org/repos/asf/storm/blob/78cb243c/storm-server/src/test/java/org/apache/storm/localizer/AsyncLocalizerTest.java
----------------------------------------------------------------------
diff --git a/storm-server/src/test/java/org/apache/storm/localizer/AsyncLocalizerTest.java b/storm-server/src/test/java/org/apache/storm/localizer/AsyncLocalizerTest.java
index 2461b33..f49be63 100644
--- a/storm-server/src/test/java/org/apache/storm/localizer/AsyncLocalizerTest.java
+++ b/storm-server/src/test/java/org/apache/storm/localizer/AsyncLocalizerTest.java
@@ -15,24 +15,51 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.storm.localizer;
 
+import static org.apache.storm.localizer.AsyncLocalizer.USERCACHE;
+import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 import static org.mockito.Matchers.*;
 import static org.mockito.Mockito.*;
 
+import com.google.common.base.Joiner;
 import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.file.Files;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.UUID;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.IOUtils;
 import org.apache.storm.DaemonConfig;
+import org.apache.storm.blobstore.BlobStoreAclHandler;
+import org.apache.storm.blobstore.InputStreamWithMeta;
+import org.apache.storm.blobstore.LocalFsBlobStore;
 import org.apache.storm.daemon.supervisor.AdvancedFSOps;
+import org.apache.storm.generated.AccessControl;
+import org.apache.storm.generated.AccessControlType;
+import org.apache.storm.generated.AuthorizationException;
+import org.apache.storm.generated.KeyNotFoundException;
+import org.apache.storm.generated.ReadableBlobMeta;
+import org.apache.storm.generated.SettableBlobMeta;
 import org.apache.storm.utils.ConfigUtils;
 import org.apache.storm.utils.ServerUtils;
 import org.apache.storm.utils.ReflectionUtils;
+import org.apache.storm.utils.Utils;
+import org.junit.After;
+import org.junit.Before;
 import org.junit.Test;
 
 import org.apache.storm.Config;
@@ -41,9 +68,16 @@ import org.apache.storm.generated.ExecutorInfo;
 import org.apache.storm.generated.LocalAssignment;
 import org.apache.storm.generated.StormTopology;
 import org.apache.storm.security.auth.DefaultPrincipalToLocal;
+import org.mockito.Mockito;
 
 public class AsyncLocalizerTest {
 
+    private static String getTestLocalizerRoot() {
+        File f = new File("./target/" + Thread.currentThread().getStackTrace()[2].getMethodName() + "/localizer/");
+        f.deleteOnExit();
+        return f.getPath();
+    }
+
     @Test
     public void testRequestDownloadBaseTopologyBlobs() throws Exception {
         final String topoId = "TOPO";
@@ -68,7 +102,6 @@ public class AsyncLocalizerTest {
         conf.put(Config.STORM_PRINCIPAL_TO_LOCAL_PLUGIN, DefaultPrincipalToLocal.class.getName());
         conf.put(Config.STORM_CLUSTER_MODE, "distributed");
         conf.put(Config.STORM_LOCAL_DIR, stormLocal);
-        Localizer localizer = mock(Localizer.class);
         AdvancedFSOps ops = mock(AdvancedFSOps.class);
         ConfigUtils mockedCU = mock(ConfigUtils.class);
         ReflectionUtils mockedRU = mock(ReflectionUtils.class);
@@ -76,7 +109,7 @@ public class AsyncLocalizerTest {
         
         Map<String, Object> topoConf = new HashMap<>(conf);
         
-        AsyncLocalizer al = new AsyncLocalizer(conf, localizer, ops);
+        AsyncLocalizer bl = new AsyncLocalizer(conf, getTestLocalizerRoot(), ops, new AtomicReference<>(new HashMap<>()), null);
         ConfigUtils orig = ConfigUtils.setInstance(mockedCU);
         ReflectionUtils origRU = ReflectionUtils.setInstance(mockedRU);
         ServerUtils origUtils = ServerUtils.setInstance(mockedU);
@@ -86,7 +119,7 @@ public class AsyncLocalizerTest {
             when(mockedRU.newInstanceImpl(ClientBlobStore.class)).thenReturn(blobStore);
             when(mockedCU.readSupervisorStormConfImpl(conf, topoId)).thenReturn(topoConf);
 
-            Future<Void> f = al.requestDownloadBaseTopologyBlobs(la, port);
+            Future<Void> f = bl.requestDownloadBaseTopologyBlobs(la, port);
             f.get(20, TimeUnit.SECONDS);
             // We should be done now...
             
@@ -102,7 +135,7 @@ public class AsyncLocalizerTest {
             
             verify(ops, never()).deleteIfExists(any(File.class));
         } finally {
-            al.shutdown();
+            bl.close();
             ConfigUtils.setInstance(orig);
             ReflectionUtils.setInstance(origRU);
             ServerUtils.setInstance(origUtils);
@@ -129,7 +162,7 @@ public class AsyncLocalizerTest {
         final File userDir = new File(stormLocal, user);
         final String stormRoot = stormLocal+topoId+"/";
         
-        final String localizerRoot = "/tmp/storm-localizer/";
+        final String localizerRoot = getTestLocalizerRoot();
         final String simpleLocalFile = localizerRoot + user + "/simple";
         final String simpleCurrentLocalFile = localizerRoot + user + "/simple.current";
        
@@ -146,7 +179,6 @@ public class AsyncLocalizerTest {
         
         Map<String, Object> conf = new HashMap<>();
         conf.put(Config.STORM_LOCAL_DIR, stormLocal);
-        Localizer localizer = mock(Localizer.class);
         AdvancedFSOps ops = mock(AdvancedFSOps.class);
         ConfigUtils mockedCU = mock(ConfigUtils.class);
 
@@ -157,33 +189,662 @@ public class AsyncLocalizerTest {
         List<LocalizedResource> localizedList = new ArrayList<>();
         LocalizedResource simpleLocal = new LocalizedResource(simpleKey, simpleLocalFile, false);
         localizedList.add(simpleLocal);
-        
-        AsyncLocalizer al = new AsyncLocalizer(conf, localizer, ops);
+
+        AsyncLocalizer bl = spy(new AsyncLocalizer(conf, localizerRoot, ops, new AtomicReference<>(new HashMap<>()), null));
         ConfigUtils orig = ConfigUtils.setInstance(mockedCU);
         try {
             when(mockedCU.supervisorStormDistRootImpl(conf, topoId)).thenReturn(stormRoot);
             when(mockedCU.readSupervisorStormConfImpl(conf, topoId)).thenReturn(topoConf);
             when(mockedCU.readSupervisorTopologyImpl(conf, topoId, ops)).thenReturn(st);
-            
-            when(localizer.getLocalUserFileCacheDir(user)).thenReturn(userDir);
-            
-            when(localizer.getBlobs(any(List.class), eq(user), eq(topoName), eq(userDir))).thenReturn(localizedList);
-            
-            Future<Void> f = al.requestDownloadTopologyBlobs(la, port);
+
+            //Write the mocking backwards so the actual method is not called on the spy object
+            doReturn(userDir).when(bl).getLocalUserFileCacheDir(user);
+            doReturn(localizedList).when(bl).getBlobs(any(List.class), eq(user), eq(topoName), eq(userDir));
+
+            Future<Void> f = bl.requestDownloadTopologyBlobs(la, port);
             f.get(20, TimeUnit.SECONDS);
             // We should be done now...
-            
-            verify(localizer).getLocalUserFileCacheDir(user);
+
+            verify(bl).getLocalUserFileCacheDir(user);
+
             verify(ops).fileExists(userDir);
             verify(ops).forceMkdir(userDir);
-            
-            verify(localizer).getBlobs(any(List.class), eq(user), eq(topoName), eq(userDir));
+
+            verify(bl).getBlobs(any(List.class), eq(user), eq(topoName), eq(userDir));
 
             verify(ops).createSymlink(new File(stormRoot, simpleLocalName), new File(simpleCurrentLocalFile));
         } finally {
-            al.shutdown();
+            bl.close();
             ConfigUtils.setInstance(orig);
         }
     }
 
+
+    //From LocalizerTest
+    private File baseDir;
+
+    private final String user1 = "user1";
+    private final String user2 = "user2";
+    private final String user3 = "user3";
+
+    private ClientBlobStore mockblobstore = mock(ClientBlobStore.class);
+
+
+    class TestLocalizer extends AsyncLocalizer {
+
+        TestLocalizer(Map<String, Object> conf, String baseDir) throws IOException {
+            super(conf, baseDir, AdvancedFSOps.make(conf), new AtomicReference<>(new HashMap<>()), null);
+        }
+
+        @Override
+        protected ClientBlobStore getClientBlobStore() {
+            return mockblobstore;
+        }
+    }
+
+    class TestInputStreamWithMeta extends InputStreamWithMeta {
+        private InputStream iostream;
+
+        public TestInputStreamWithMeta() {
+            iostream = IOUtils.toInputStream("some test data for my input stream");
+        }
+
+        public TestInputStreamWithMeta(InputStream istream) {
+            iostream = istream;
+        }
+
+        @Override
+        public long getVersion() throws IOException {
+            return 1;
+        }
+
+        @Override
+        public synchronized int read() {
+            return 0;
+        }
+
+        @Override
+        public synchronized int read(byte[] b)
+            throws IOException {
+            int length = iostream.read(b);
+            if (length == 0) {
+                return -1;
+            }
+            return length;
+        }
+
+        @Override
+        public long getFileLength() {
+            return 0;
+        }
+    };
+
+    @Before
+    public void setUp() throws Exception {
+        baseDir = new File(System.getProperty("java.io.tmpdir") + "/blob-store-localizer-test-"+ UUID.randomUUID());
+        if (!baseDir.mkdir()) {
+            throw new IOException("failed to create base directory");
+        }
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        try {
+            FileUtils.deleteDirectory(baseDir);
+        } catch (IOException ignore) {}
+    }
+
+    protected String joinPath(String... pathList) {
+        return Joiner.on(File.separator).join(pathList);
+    }
+
+    public String constructUserCacheDir(String base, String user) {
+        return joinPath(base, USERCACHE, user);
+    }
+
+    public String constructExpectedFilesDir(String base, String user) {
+        return joinPath(constructUserCacheDir(base, user), AsyncLocalizer.FILECACHE, AsyncLocalizer.FILESDIR);
+    }
+
+    public String constructExpectedArchivesDir(String base, String user) {
+        return joinPath(constructUserCacheDir(base, user), AsyncLocalizer.FILECACHE, AsyncLocalizer.ARCHIVESDIR);
+    }
+
+    @Test
+    public void testDirPaths() throws Exception {
+        Map<String, Object> conf = new HashMap();
+        AsyncLocalizer localizer = new TestLocalizer(conf, baseDir.toString());
+
+        String expectedDir = constructUserCacheDir(baseDir.toString(), user1);
+        assertEquals("get local user dir doesn't return right value",
+            expectedDir, localizer.getLocalUserDir(user1).toString());
+
+        String expectedFileDir = joinPath(expectedDir, AsyncLocalizer.FILECACHE);
+        assertEquals("get local user file dir doesn't return right value",
+            expectedFileDir, localizer.getLocalUserFileCacheDir(user1).toString());
+    }
+
+    @Test
+    public void testReconstruct() throws Exception {
+        Map<String, Object> conf = new HashMap();
+
+        String expectedFileDir1 = constructExpectedFilesDir(baseDir.toString(), user1);
+        String expectedArchiveDir1 = constructExpectedArchivesDir(baseDir.toString(), user1);
+        String expectedFileDir2 = constructExpectedFilesDir(baseDir.toString(), user2);
+        String expectedArchiveDir2 = constructExpectedArchivesDir(baseDir.toString(), user2);
+
+        String key1 = "testfile1.txt";
+        String key2 = "testfile2.txt";
+        String key3 = "testfile3.txt";
+        String key4 = "testfile4.txt";
+
+        String archive1 = "archive1";
+        String archive2 = "archive2";
+
+        File user1file1 = new File(expectedFileDir1, key1 + ServerUtils.DEFAULT_CURRENT_BLOB_SUFFIX);
+        File user1file2 = new File(expectedFileDir1, key2 + ServerUtils.DEFAULT_CURRENT_BLOB_SUFFIX);
+        File user2file3 = new File(expectedFileDir2, key3 + ServerUtils.DEFAULT_CURRENT_BLOB_SUFFIX);
+        File user2file4 = new File(expectedFileDir2, key4 + ServerUtils.DEFAULT_CURRENT_BLOB_SUFFIX);
+
+        File user1archive1 = new File(expectedArchiveDir1, archive1 + ServerUtils.DEFAULT_CURRENT_BLOB_SUFFIX);
+        File user2archive2 = new File(expectedArchiveDir2, archive2 + ServerUtils.DEFAULT_CURRENT_BLOB_SUFFIX);
+        File user1archive1file = new File(user1archive1, "file1");
+        File user2archive2file = new File(user2archive2, "file2");
+
+        // setup some files/dirs to emulate supervisor restart
+        assertTrue("Failed setup filecache dir1", new File(expectedFileDir1).mkdirs());
+        assertTrue("Failed setup filecache dir2", new File(expectedFileDir2).mkdirs());
+        assertTrue("Failed setup file1", user1file1.createNewFile());
+        assertTrue("Failed setup file2", user1file2.createNewFile());
+        assertTrue("Failed setup file3", user2file3.createNewFile());
+        assertTrue("Failed setup file4", user2file4.createNewFile());
+        assertTrue("Failed setup archive dir1", user1archive1.mkdirs());
+        assertTrue("Failed setup archive dir2", user2archive2.mkdirs());
+        assertTrue("Failed setup file in archivedir1", user1archive1file.createNewFile());
+        assertTrue("Failed setup file in archivedir2", user2archive2file.createNewFile());
+
+        AsyncLocalizer localizer = new TestLocalizer(conf, baseDir.toString());
+
+        ArrayList<LocalResource> arrUser1Keys = new ArrayList<LocalResource>();
+        arrUser1Keys.add(new LocalResource(key1, false));
+        arrUser1Keys.add(new LocalResource(archive1, true));
+        localizer.addReferences(arrUser1Keys, user1, "topo1");
+
+        LocalizedResourceSet lrsrcSet = localizer.getUserResources().get(user1);
+        assertEquals("local resource set size wrong", 3, lrsrcSet.getSize());
+        assertEquals("user doesn't match", user1, lrsrcSet.getUser());
+        LocalizedResource key1rsrc = lrsrcSet.get(key1, false);
+        assertNotNull("Local resource doesn't exist but should", key1rsrc);
+        assertEquals("key doesn't match", key1, key1rsrc.getKey());
+        assertEquals("refcount doesn't match", 1, key1rsrc.getRefCount());
+        LocalizedResource key2rsrc = lrsrcSet.get(key2, false);
+        assertNotNull("Local resource doesn't exist but should", key2rsrc);
+        assertEquals("key doesn't match", key2, key2rsrc.getKey());
+        assertEquals("refcount doesn't match", 0, key2rsrc.getRefCount());
+        LocalizedResource archive1rsrc = lrsrcSet.get(archive1, true);
+        assertNotNull("Local resource doesn't exist but should", archive1rsrc);
+        assertEquals("key doesn't match", archive1, archive1rsrc.getKey());
+        assertEquals("refcount doesn't match", 1, archive1rsrc.getRefCount());
+
+        LocalizedResourceSet lrsrcSet2 = localizer.getUserResources().get(user2);
+        assertEquals("local resource set size wrong", 3, lrsrcSet2.getSize());
+        assertEquals("user doesn't match", user2, lrsrcSet2.getUser());
+        LocalizedResource key3rsrc = lrsrcSet2.get(key3, false);
+        assertNotNull("Local resource doesn't exist but should", key3rsrc);
+        assertEquals("key doesn't match", key3, key3rsrc.getKey());
+        assertEquals("refcount doesn't match", 0, key3rsrc.getRefCount());
+        LocalizedResource key4rsrc = lrsrcSet2.get(key4, false);
+        assertNotNull("Local resource doesn't exist but should", key4rsrc);
+        assertEquals("key doesn't match", key4, key4rsrc.getKey());
+        assertEquals("refcount doesn't match", 0, key4rsrc.getRefCount());
+        LocalizedResource archive2rsrc = lrsrcSet2.get(archive2, true);
+        assertNotNull("Local resource doesn't exist but should", archive2rsrc);
+        assertEquals("key doesn't match", archive2, archive2rsrc.getKey());
+        assertEquals("refcount doesn't match", 0, archive2rsrc.getRefCount());
+    }
+
+    @Test
+    public void testArchivesTgz() throws Exception {
+        testArchives(getFileFromResource(joinPath("localizer", "localtestwithsymlink.tgz")), true, 21344);
+    }
+
+    @Test
+    public void testArchivesZip() throws Exception {
+        testArchives(getFileFromResource(joinPath("localizer", "localtest.zip")), false, 21348);
+    }
+
+    @Test
+    public void testArchivesTarGz() throws Exception {
+        testArchives(getFileFromResource(joinPath("localizer", "localtestwithsymlink.tar.gz")), true, 21344);
+    }
+
+    @Test
+    public void testArchivesTar() throws Exception {
+        testArchives(getFileFromResource(joinPath("localizer", "localtestwithsymlink.tar")), true, 21344);
+    }
+
+    @Test
+    public void testArchivesJar() throws Exception {
+        testArchives(getFileFromResource(joinPath("localizer", "localtestwithsymlink.jar")), false, 21416);
+    }
+
+    private File getFileFromResource(String archivePath) {
+        ClassLoader classLoader = getClass().getClassLoader();
+        return new File(classLoader.getResource(archivePath).getFile());
+    }
+
+    // archive passed in must contain symlink named tmptestsymlink if not a zip file
+    public void testArchives(File archiveFile, boolean supportSymlinks, int size) throws Exception {
+        if (Utils.isOnWindows()) {
+            // Windows should set this to false cause symlink in compressed file doesn't work properly.
+            supportSymlinks = false;
+        }
+
+        Map<String, Object> conf = new HashMap();
+        // set clean time really high so doesn't kick in
+        conf.put(DaemonConfig.SUPERVISOR_LOCALIZER_CACHE_CLEANUP_INTERVAL_MS, 60*60*1000);
+
+        String key1 = archiveFile.getName();
+        String topo1 = "topo1";
+        AsyncLocalizer localizer = new TestLocalizer(conf, baseDir.toString());
+        // set really small so will do cleanup
+        localizer.setTargetCacheSize(1);
+
+        ReadableBlobMeta rbm = new ReadableBlobMeta();
+        rbm.set_settable(new SettableBlobMeta(BlobStoreAclHandler.WORLD_EVERYTHING));
+        when(mockblobstore.getBlobMeta(key1)).thenReturn(rbm);
+
+        when(mockblobstore.getBlob(key1)).thenReturn(new TestInputStreamWithMeta(new
+            FileInputStream(archiveFile.getAbsolutePath())));
+
+        long timeBefore = System.nanoTime();
+        File user1Dir = localizer.getLocalUserFileCacheDir(user1);
+        assertTrue("failed to create user dir", user1Dir.mkdirs());
+        LocalizedResource lrsrc = localizer.getBlob(new LocalResource(key1, true), user1, topo1,
+            user1Dir);
+        long timeAfter = System.nanoTime();
+
+        String expectedUserDir = joinPath(baseDir.toString(), USERCACHE, user1);
+        String expectedFileDir = joinPath(expectedUserDir, AsyncLocalizer.FILECACHE, AsyncLocalizer.ARCHIVESDIR);
+        assertTrue("user filecache dir not created", new File(expectedFileDir).exists());
+        File keyFile = new File(expectedFileDir, key1 + ".0");
+        assertTrue("blob not created", keyFile.exists());
+        assertTrue("blob is not uncompressed", keyFile.isDirectory());
+        File symlinkFile = new File(keyFile, "tmptestsymlink");
+
+        if (supportSymlinks) {
+            assertTrue("blob uncompressed doesn't contain symlink", Files.isSymbolicLink(
+                symlinkFile.toPath()));
+        } else {
+            assertTrue("blob symlink file doesn't exist", symlinkFile.exists());
+        }
+
+        LocalizedResourceSet lrsrcSet = localizer.getUserResources().get(user1);
+        assertEquals("local resource set size wrong", 1, lrsrcSet.getSize());
+        assertEquals("user doesn't match", user1, lrsrcSet.getUser());
+        LocalizedResource key1rsrc = lrsrcSet.get(key1, true);
+        assertNotNull("Local resource doesn't exist but should", key1rsrc);
+        assertEquals("key doesn't match", key1, key1rsrc.getKey());
+        assertEquals("refcount doesn't match", 1, key1rsrc.getRefCount());
+        assertEquals("file path doesn't match", keyFile.toString(), key1rsrc.getFilePathWithVersion());
+        assertEquals("size doesn't match", size, key1rsrc.getSize());
+        assertTrue("timestamp not within range", (key1rsrc.getLastAccessTime() >= timeBefore && key1rsrc
+            .getLastAccessTime() <= timeAfter));
+
+        timeBefore = System.nanoTime();
+        localizer.removeBlobReference(lrsrc.getKey(), user1, topo1, true);
+        timeAfter = System.nanoTime();
+
+        lrsrcSet = localizer.getUserResources().get(user1);
+        assertEquals("local resource set size wrong", 1, lrsrcSet.getSize());
+        key1rsrc = lrsrcSet.get(key1, true);
+        assertNotNull("Local resource doesn't exist but should", key1rsrc);
+        assertEquals("refcount doesn't match", 0, key1rsrc.getRefCount());
+        assertTrue("timestamp not within range", (key1rsrc.getLastAccessTime() >= timeBefore && key1rsrc
+            .getLastAccessTime() <= timeAfter));
+
+        // should remove the blob since cache size set really small
+        localizer.cleanup();
+
+        lrsrcSet = localizer.getUserResources().get(user1);
+        assertFalse("blob contents not deleted", symlinkFile.exists());
+        assertFalse("blob not deleted", keyFile.exists());
+        assertFalse("blob file dir not deleted", new File(expectedFileDir).exists());
+        assertFalse("blob dir not deleted", new File(expectedUserDir).exists());
+        assertNull("user set should be null", lrsrcSet);
+
+    }
+
+    @Test
+    public void testBasic() throws Exception {
+        Map<String, Object> conf = new HashMap();
+        // set clean time really high so doesn't kick in
+        conf.put(DaemonConfig.SUPERVISOR_LOCALIZER_CACHE_CLEANUP_INTERVAL_MS, 60*60*1000);
+
+        String key1 = "key1";
+        String topo1 = "topo1";
+        AsyncLocalizer localizer = new TestLocalizer(conf, baseDir.toString());
+        // set really small so will do cleanup
+        localizer.setTargetCacheSize(1);
+
+        ReadableBlobMeta rbm = new ReadableBlobMeta();
+        rbm.set_settable(new SettableBlobMeta(BlobStoreAclHandler.WORLD_EVERYTHING));
+        when(mockblobstore.getBlobMeta(key1)).thenReturn(rbm);
+
+        when(mockblobstore.getBlob(key1)).thenReturn(new TestInputStreamWithMeta());
+
+        long timeBefore = System.nanoTime();
+        File user1Dir = localizer.getLocalUserFileCacheDir(user1);
+        assertTrue("failed to create user dir", user1Dir.mkdirs());
+        LocalizedResource lrsrc = localizer.getBlob(new LocalResource(key1, false), user1, topo1,
+            user1Dir);
+        long timeAfter = System.nanoTime();
+
+        String expectedUserDir = joinPath(baseDir.toString(), USERCACHE, user1);
+        String expectedFileDir = joinPath(expectedUserDir, AsyncLocalizer.FILECACHE, AsyncLocalizer.FILESDIR);
+        assertTrue("user filecache dir not created", new File(expectedFileDir).exists());
+        File keyFile = new File(expectedFileDir, key1);
+        File keyFileCurrentSymlink = new File(expectedFileDir, key1 + ServerUtils.DEFAULT_CURRENT_BLOB_SUFFIX);
+
+        assertTrue("blob not created", keyFileCurrentSymlink.exists());
+
+        LocalizedResourceSet lrsrcSet = localizer.getUserResources().get(user1);
+        assertEquals("local resource set size wrong", 1, lrsrcSet.getSize());
+        assertEquals("user doesn't match", user1, lrsrcSet.getUser());
+        LocalizedResource key1rsrc = lrsrcSet.get(key1, false);
+        assertNotNull("Local resource doesn't exist but should", key1rsrc);
+        assertEquals("key doesn't match", key1, key1rsrc.getKey());
+        assertEquals("refcount doesn't match", 1, key1rsrc.getRefCount());
+        assertEquals("file path doesn't match", keyFile.toString(), key1rsrc.getFilePath());
+        assertEquals("size doesn't match", 34, key1rsrc.getSize());
+        assertTrue("timestamp not within range", (key1rsrc.getLastAccessTime() >= timeBefore && key1rsrc
+            .getLastAccessTime() <= timeAfter));
+
+        timeBefore = System.nanoTime();
+        localizer.removeBlobReference(lrsrc.getKey(), user1, topo1, false);
+        timeAfter = System.nanoTime();
+
+        lrsrcSet = localizer.getUserResources().get(user1);
+        assertEquals("local resource set size wrong", 1, lrsrcSet.getSize());
+        key1rsrc = lrsrcSet.get(key1, false);
+        assertNotNull("Local resource doesn't exist but should", key1rsrc);
+        assertEquals("refcount doesn't match", 0, key1rsrc.getRefCount());
+        assertTrue("timestamp not within range", (key1rsrc.getLastAccessTime() >= timeBefore && key1rsrc
+            .getLastAccessTime() <= timeAfter));
+
+        // should remove the blob since cache size set really small
+        localizer.cleanup();
+
+        lrsrcSet = localizer.getUserResources().get(user1);
+        assertNull("user set should be null", lrsrcSet);
+        assertFalse("blob not deleted", keyFile.exists());
+        assertFalse("blob dir not deleted", new File(expectedFileDir).exists());
+        assertFalse("blob dir not deleted", new File(expectedUserDir).exists());
+    }
+
+    @Test
+    public void testMultipleKeysOneUser() throws Exception {
+        Map<String, Object> conf = new HashMap();
+        // set clean time really high so doesn't kick in
+        conf.put(DaemonConfig.SUPERVISOR_LOCALIZER_CACHE_CLEANUP_INTERVAL_MS, 60*60*1000);
+
+        String key1 = "key1";
+        String topo1 = "topo1";
+        String key2 = "key2";
+        String key3 = "key3";
+        AsyncLocalizer localizer = new TestLocalizer(conf, baseDir.toString());
+        // set to keep 2 blobs (each of size 34)
+        localizer.setTargetCacheSize(68);
+
+        ReadableBlobMeta rbm = new ReadableBlobMeta();
+        rbm.set_settable(new SettableBlobMeta(BlobStoreAclHandler.WORLD_EVERYTHING));
+        when(mockblobstore.getBlobMeta(anyString())).thenReturn(rbm);
+        when(mockblobstore.getBlob(key1)).thenReturn(new TestInputStreamWithMeta());
+        when(mockblobstore.getBlob(key2)).thenReturn(new TestInputStreamWithMeta());
+        when(mockblobstore.getBlob(key3)).thenReturn(new TestInputStreamWithMeta());
+
+        List<LocalResource> keys = Arrays.asList(new LocalResource[]{new LocalResource(key1, false),
+            new LocalResource(key2, false), new LocalResource(key3, false)});
+        File user1Dir = localizer.getLocalUserFileCacheDir(user1);
+        assertTrue("failed to create user dir", user1Dir.mkdirs());
+
+        List<LocalizedResource> lrsrcs = localizer.getBlobs(keys, user1, topo1, user1Dir);
+        LocalizedResource lrsrc = lrsrcs.get(0);
+        LocalizedResource lrsrc2 = lrsrcs.get(1);
+        LocalizedResource lrsrc3 = lrsrcs.get(2);
+
+        String expectedFileDir = joinPath(baseDir.toString(), USERCACHE, user1,
+            AsyncLocalizer.FILECACHE, AsyncLocalizer.FILESDIR);
+        assertTrue("user filecache dir not created", new File(expectedFileDir).exists());
+        File keyFile = new File(expectedFileDir, key1 + ServerUtils.DEFAULT_CURRENT_BLOB_SUFFIX);
+        File keyFile2 = new File(expectedFileDir, key2 + ServerUtils.DEFAULT_CURRENT_BLOB_SUFFIX);
+        File keyFile3 = new File(expectedFileDir, key3 + ServerUtils.DEFAULT_CURRENT_BLOB_SUFFIX);
+
+        assertTrue("blob not created", keyFile.exists());
+        assertTrue("blob not created", keyFile2.exists());
+        assertTrue("blob not created", keyFile3.exists());
+        assertEquals("size doesn't match", 34, keyFile.length());
+        assertEquals("size doesn't match", 34, keyFile2.length());
+        assertEquals("size doesn't match", 34, keyFile3.length());
+        assertEquals("size doesn't match", 34, lrsrc.getSize());
+        assertEquals("size doesn't match", 34, lrsrc3.getSize());
+        assertEquals("size doesn't match", 34, lrsrc2.getSize());
+
+        LocalizedResourceSet lrsrcSet = localizer.getUserResources().get(user1);
+        assertEquals("local resource set size wrong", 3, lrsrcSet.getSize());
+        assertEquals("user doesn't match", user1, lrsrcSet.getUser());
+
+        long timeBefore = System.nanoTime();
+        localizer.removeBlobReference(lrsrc.getKey(), user1, topo1, false);
+        localizer.removeBlobReference(lrsrc2.getKey(), user1, topo1, false);
+        localizer.removeBlobReference(lrsrc3.getKey(), user1, topo1, false);
+        long timeAfter = System.nanoTime();
+
+        // add reference to one and then remove reference again so it has newer timestamp
+        lrsrc = localizer.getBlob(new LocalResource(key1, false), user1, topo1, user1Dir);
+        assertTrue("timestamp not within range", (lrsrc.getLastAccessTime() >= timeBefore && lrsrc
+            .getLastAccessTime() <= timeAfter));
+        localizer.removeBlobReference(lrsrc.getKey(), user1, topo1, false);
+
+        // should remove the second blob first
+        localizer.cleanup();
+
+        lrsrcSet = localizer.getUserResources().get(user1);
+        assertEquals("local resource set size wrong", 2, lrsrcSet.getSize());
+        long end = System.currentTimeMillis() + 100;
+        while ((end - System.currentTimeMillis()) >= 0 && keyFile2.exists()) {
+            Thread.sleep(1);
+        }
+        assertFalse("blob not deleted", keyFile2.exists());
+        assertTrue("blob deleted", keyFile.exists());
+        assertTrue("blob deleted", keyFile3.exists());
+
+        // set size to cleanup another one
+        localizer.setTargetCacheSize(34);
+
+        // should remove the third blob
+        localizer.cleanup();
+
+        lrsrcSet = localizer.getUserResources().get(user1);
+        assertEquals("local resource set size wrong", 1, lrsrcSet.getSize());
+        assertTrue("blob deleted", keyFile.exists());
+        assertFalse("blob not deleted", keyFile3.exists());
+    }
+
+    @Test(expected = AuthorizationException.class)
+    public void testFailAcls() throws Exception {
+        Map<String, Object> conf = new HashMap();
+        // set clean time really high so doesn't kick in
+        conf.put(DaemonConfig.SUPERVISOR_LOCALIZER_CACHE_CLEANUP_INTERVAL_MS, 60 * 60 * 1000);
+        // enable blobstore acl validation
+        conf.put(Config.STORM_BLOBSTORE_ACL_VALIDATION_ENABLED, true);
+
+        String topo1 = "topo1";
+        String key1 = "key1";
+        AsyncLocalizer localizer = new TestLocalizer(conf, baseDir.toString());
+
+        ReadableBlobMeta rbm = new ReadableBlobMeta();
+        // set acl so user doesn't have read access
+        AccessControl acl = new AccessControl(AccessControlType.USER, BlobStoreAclHandler.ADMIN);
+        acl.set_name(user1);
+        rbm.set_settable(new SettableBlobMeta(Arrays.asList(acl)));
+        when(mockblobstore.getBlobMeta(anyString())).thenReturn(rbm);
+        when(mockblobstore.getBlob(key1)).thenReturn(new TestInputStreamWithMeta());
+        File user1Dir = localizer.getLocalUserFileCacheDir(user1);
+        assertTrue("failed to create user dir", user1Dir.mkdirs());
+
+        // This should throw AuthorizationException because auth failed
+        localizer.getBlob(new LocalResource(key1, false), user1, topo1, user1Dir);
+    }
+
+    @Test(expected = KeyNotFoundException.class)
+    public void testKeyNotFoundException() throws Exception {
+        Map<String, Object> conf = Utils.readStormConfig();
+        String key1 = "key1";
+        conf.put(Config.STORM_LOCAL_DIR, "target");
+        LocalFsBlobStore bs = new LocalFsBlobStore();
+        LocalFsBlobStore spy = spy(bs);
+        Mockito.doReturn(true).when(spy).checkForBlobOrDownload(key1);
+        Mockito.doNothing().when(spy).checkForBlobUpdate(key1);
+        spy.prepare(conf,null,null);
+        spy.getBlob(key1, null);
+    }
+
+    @Test
+    public void testMultipleUsers() throws Exception {
+        Map<String, Object> conf = new HashMap();
+        // set clean time really high so doesn't kick in
+        conf.put(DaemonConfig.SUPERVISOR_LOCALIZER_CACHE_CLEANUP_INTERVAL_MS, 60*60*1000);
+
+        String topo1 = "topo1";
+        String topo2 = "topo2";
+        String topo3 = "topo3";
+        String key1 = "key1";
+        String key2 = "key2";
+        String key3 = "key3";
+        AsyncLocalizer localizer = new TestLocalizer(conf, baseDir.toString());
+        // set to keep 2 blobs (each of size 34)
+        localizer.setTargetCacheSize(68);
+
+        ReadableBlobMeta rbm = new ReadableBlobMeta();
+        rbm.set_settable(new SettableBlobMeta(BlobStoreAclHandler.WORLD_EVERYTHING));
+        when(mockblobstore.getBlobMeta(anyString())).thenReturn(rbm);
+        when(mockblobstore.getBlob(key1)).thenReturn(new TestInputStreamWithMeta());
+        when(mockblobstore.getBlob(key2)).thenReturn(new TestInputStreamWithMeta());
+        when(mockblobstore.getBlob(key3)).thenReturn(new TestInputStreamWithMeta());
+
+        File user1Dir = localizer.getLocalUserFileCacheDir(user1);
+        assertTrue("failed to create user dir", user1Dir.mkdirs());
+        File user2Dir = localizer.getLocalUserFileCacheDir(user2);
+        assertTrue("failed to create user dir", user2Dir.mkdirs());
+        File user3Dir = localizer.getLocalUserFileCacheDir(user3);
+        assertTrue("failed to create user dir", user3Dir.mkdirs());
+
+        LocalizedResource lrsrc = localizer.getBlob(new LocalResource(key1, false), user1, topo1,
+            user1Dir);
+        LocalizedResource lrsrc2 = localizer.getBlob(new LocalResource(key2, false), user2, topo2,
+            user2Dir);
+        LocalizedResource lrsrc3 = localizer.getBlob(new LocalResource(key3, false), user3, topo3,
+            user3Dir);
+        // make sure we support different user reading same blob
+        LocalizedResource lrsrc1_user3 = localizer.getBlob(new LocalResource(key1, false), user3,
+            topo3, user3Dir);
+
+        String expectedUserDir1 = joinPath(baseDir.toString(), USERCACHE, user1);
+        String expectedFileDirUser1 = joinPath(expectedUserDir1, AsyncLocalizer.FILECACHE, AsyncLocalizer.FILESDIR);
+        String expectedFileDirUser2 = joinPath(baseDir.toString(), USERCACHE, user2,
+            AsyncLocalizer.FILECACHE, AsyncLocalizer.FILESDIR);
+        String expectedFileDirUser3 = joinPath(baseDir.toString(), USERCACHE, user3,
+            AsyncLocalizer.FILECACHE, AsyncLocalizer.FILESDIR);
+        assertTrue("user filecache dir user1 not created", new File(expectedFileDirUser1).exists());
+        assertTrue("user filecache dir user2 not created", new File(expectedFileDirUser2).exists());
+        assertTrue("user filecache dir user3 not created", new File(expectedFileDirUser3).exists());
+
+        File keyFile = new File(expectedFileDirUser1, key1 + ServerUtils.DEFAULT_CURRENT_BLOB_SUFFIX);
+        File keyFile2 = new File(expectedFileDirUser2, key2 + ServerUtils.DEFAULT_CURRENT_BLOB_SUFFIX);
+        File keyFile3 = new File(expectedFileDirUser3, key3 + ServerUtils.DEFAULT_CURRENT_BLOB_SUFFIX);
+        File keyFile1user3 = new File(expectedFileDirUser3, key1 + ServerUtils.DEFAULT_CURRENT_BLOB_SUFFIX);
+
+        assertTrue("blob not created", keyFile.exists());
+        assertTrue("blob not created", keyFile2.exists());
+        assertTrue("blob not created", keyFile3.exists());
+        assertTrue("blob not created", keyFile1user3.exists());
+
+        LocalizedResourceSet lrsrcSet = localizer.getUserResources().get(user1);
+        assertEquals("local resource set size wrong", 1, lrsrcSet.getSize());
+        LocalizedResourceSet lrsrcSet2 = localizer.getUserResources().get(user2);
+        assertEquals("local resource set size wrong", 1, lrsrcSet2.getSize());
+        LocalizedResourceSet lrsrcSet3 = localizer.getUserResources().get(user3);
+        assertEquals("local resource set size wrong", 2, lrsrcSet3.getSize());
+
+        localizer.removeBlobReference(lrsrc.getKey(), user1, topo1, false);
+        // should remove key1
+        localizer.cleanup();
+
+        lrsrcSet = localizer.getUserResources().get(user1);
+        lrsrcSet3 = localizer.getUserResources().get(user3);
+        assertNull("user set should be null", lrsrcSet);
+        assertFalse("blob dir not deleted", new File(expectedFileDirUser1).exists());
+        assertFalse("blob dir not deleted", new File(expectedUserDir1).exists());
+        assertEquals("local resource set size wrong", 2, lrsrcSet3.getSize());
+
+        assertTrue("blob deleted", keyFile2.exists());
+        assertFalse("blob not deleted", keyFile.exists());
+        assertTrue("blob deleted", keyFile3.exists());
+        assertTrue("blob deleted", keyFile1user3.exists());
+    }
+
+    @Test
+    public void testUpdate() throws Exception {
+        Map<String, Object> conf = new HashMap();
+        // set clean time really high so doesn't kick in
+        conf.put(DaemonConfig.SUPERVISOR_LOCALIZER_CACHE_CLEANUP_INTERVAL_MS, 60*60*1000);
+
+        String key1 = "key1";
+        String topo1 = "topo1";
+        String topo2 = "topo2";
+        AsyncLocalizer localizer = new TestLocalizer(conf, baseDir.toString());
+
+        ReadableBlobMeta rbm = new ReadableBlobMeta();
+        rbm.set_version(1);
+        rbm.set_settable(new SettableBlobMeta(BlobStoreAclHandler.WORLD_EVERYTHING));
+        when(mockblobstore.getBlobMeta(key1)).thenReturn(rbm);
+        when(mockblobstore.getBlob(key1)).thenReturn(new TestInputStreamWithMeta());
+
+        File user1Dir = localizer.getLocalUserFileCacheDir(user1);
+        assertTrue("failed to create user dir", user1Dir.mkdirs());
+        LocalizedResource lrsrc = localizer.getBlob(new LocalResource(key1, false), user1, topo1,
+            user1Dir);
+
+        String expectedUserDir = joinPath(baseDir.toString(), USERCACHE, user1);
+        String expectedFileDir = joinPath(expectedUserDir, AsyncLocalizer.FILECACHE, AsyncLocalizer.FILESDIR);
+        assertTrue("user filecache dir not created", new File(expectedFileDir).exists());
+        File keyFile = new File(expectedFileDir, key1);
+        File keyFileCurrentSymlink = new File(expectedFileDir, key1 + ServerUtils.DEFAULT_CURRENT_BLOB_SUFFIX);
+        assertTrue("blob not created", keyFileCurrentSymlink.exists());
+        File versionFile = new File(expectedFileDir, key1 + ServerUtils.DEFAULT_BLOB_VERSION_SUFFIX);
+        assertTrue("blob version file not created", versionFile.exists());
+        assertEquals("blob version not correct", 1, ServerUtils.localVersionOfBlob(keyFile.toString()));
+
+        LocalizedResourceSet lrsrcSet = localizer.getUserResources().get(user1);
+        assertEquals("local resource set size wrong", 1, lrsrcSet.getSize());
+
+        // test another topology getting blob with updated version - it should update version now
+        rbm.set_version(2);
+
+        localizer.getBlob(new LocalResource(key1, false), user1, topo2, user1Dir);
+        assertTrue("blob version file not created", versionFile.exists());
+        assertEquals("blob version not correct", 2, ServerUtils.localVersionOfBlob(keyFile.toString()));
+        assertTrue("blob file with version 2 not created", new File(keyFile + ".2").exists());
+
+        // now test regular updateBlob
+        rbm.set_version(3);
+
+        ArrayList<LocalResource> arr = new ArrayList<LocalResource>();
+        arr.add(new LocalResource(key1, false));
+        localizer.updateBlobs(arr, user1);
+        assertTrue("blob version file not created", versionFile.exists());
+        assertEquals("blob version not correct", 3, ServerUtils.localVersionOfBlob(keyFile.toString()));
+        assertTrue("blob file with version 3 not created", new File(keyFile + ".3").exists());
+    }
 }