You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2015/12/04 16:03:59 UTC

[04/17] storm git commit: Blobstore API STORM- 876

http://git-wip-us.apache.org/repos/asf/storm/blob/7029aee5/storm-core/src/jvm/backtype/storm/localizer/Localizer.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/localizer/Localizer.java b/storm-core/src/jvm/backtype/storm/localizer/Localizer.java
new file mode 100644
index 0000000..ef5684f
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/localizer/Localizer.java
@@ -0,0 +1,695 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package backtype.storm.localizer;
+
+import backtype.storm.Config;
+import backtype.storm.blobstore.ClientBlobStore;
+import backtype.storm.blobstore.InputStreamWithMeta;
+import backtype.storm.generated.AuthorizationException;
+import backtype.storm.generated.KeyNotFoundException;
+import backtype.storm.utils.ShellUtils.ExitCodeException;
+import backtype.storm.utils.ShellUtils.ShellCommandExecutor;
+import backtype.storm.utils.Utils;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.commons.io.FileUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.FileWriter;
+import java.io.FilenameFilter;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import static java.nio.file.StandardCopyOption.ATOMIC_MOVE;
+
+/**
+ * Class to download and manage files from the blobstore.  It uses an LRU cache
+ * to determine which files to keep so they can be reused and which files to delete.
+ */
+public class Localizer {
+  public static final Logger LOG = LoggerFactory.getLogger(Localizer.class);
+
+  private Map _conf;
+  private int _threadPoolSize;
+  // thread pool for initial download
+  private ExecutorService _execService;
+  // thread pool for updates
+  private ExecutorService _updateExecService;
+  private int _blobDownloadRetries;
+
+  // track resources - user to resourceSet
+  private final ConcurrentMap<String, LocalizedResourceSet> _userRsrc = new
+      ConcurrentHashMap<String, LocalizedResourceSet>();
+
+  private String _localBaseDir;
+  public static final String USERCACHE = "usercache";
+  public static final String FILECACHE = "filecache";
+
+  // sub directories to store either files or uncompressed archives respectively
+  public static final String FILESDIR = "files";
+  public static final String ARCHIVESDIR = "archives";
+
+  private static final String TO_UNCOMPRESS = "_tmp_";
+
+  // cleanup
+  private long _cacheTargetSize;
+  private long _cacheCleanupPeriod;
+  private ScheduledExecutorService _cacheCleanupService;
+
+  public Localizer(Map conf, String baseDir) {
+    _conf = conf;
+    _localBaseDir = baseDir;
+    // default cache size 10GB, converted to Bytes
+    _cacheTargetSize = Utils.getInt(_conf.get(Config.SUPERVISOR_LOCALIZER_CACHE_TARGET_SIZE_MB),
+            10 * 1024).longValue() << 20;
+    // default 10 minutes.
+    _cacheCleanupPeriod = Utils.getInt(_conf.get(
+            Config.SUPERVISOR_LOCALIZER_CACHE_CLEANUP_INTERVAL_MS), 10 * 60 * 1000).longValue();
+
+    // if we needed we could make config for update thread pool size
+    _threadPoolSize = Utils.getInt(_conf.get(Config.SUPERVISOR_BLOBSTORE_DOWNLOAD_THREAD_COUNT), 5);
+    _blobDownloadRetries = Utils.getInt(_conf.get(
+            Config.SUPERVISOR_BLOBSTORE_DOWNLOAD_MAX_RETRIES), 3);
+
+    _execService = Executors.newFixedThreadPool(_threadPoolSize);
+    _updateExecService = Executors.newFixedThreadPool(_threadPoolSize);
+    reconstructLocalizedResources();
+  }
+
+  // For testing, it allows setting size in bytes
+  protected void setTargetCacheSize(long size) {
+    _cacheTargetSize = size;
+  }
+
+  // For testing, be careful as it doesn't clone
+  ConcurrentMap<String, LocalizedResourceSet> getUserResources() {
+    return _userRsrc;
+  }
+
+  public void startCleaner() {
+    _cacheCleanupService = new ScheduledThreadPoolExecutor(1,
+        new ThreadFactoryBuilder()
+            .setNameFormat("Localizer Cache Cleanup")
+            .build());
+
+    _cacheCleanupService.scheduleWithFixedDelay(new Runnable() {
+          @Override
+          public void run() {
+            handleCacheCleanup();
+          }
+        }, _cacheCleanupPeriod, _cacheCleanupPeriod, TimeUnit.MILLISECONDS);
+  }
+
+  public void shutdown() {
+    if (_cacheCleanupService != null) {
+      _cacheCleanupService.shutdown();
+    }
+    if (_execService != null) {
+      _execService.shutdown();
+    }
+    if (_updateExecService != null) {
+      _updateExecService.shutdown();
+    }
+  }
+
+  // baseDir/supervisor/usercache/
+  protected File getUserCacheDir() {
+    return new File(_localBaseDir, USERCACHE);
+  }
+
+  // baseDir/supervisor/usercache/user1/
+  protected File getLocalUserDir(String userName) {
+    return new File(getUserCacheDir(), userName);
+  }
+
+  // baseDir/supervisor/usercache/user1/filecache
+  public File getLocalUserFileCacheDir(String userName) {
+    return new File(getLocalUserDir(userName), FILECACHE);
+  }
+
+  // baseDir/supervisor/usercache/user1/filecache/files
+  protected File getCacheDirForFiles(File dir) {
+    return new File(dir, FILESDIR);
+  }
+
+  // get the directory to put uncompressed archives in
+  // baseDir/supervisor/usercache/user1/filecache/archives
+  protected File getCacheDirForArchives(File dir) {
+    return new File(dir, ARCHIVESDIR);
+  }
+
+  protected void addLocalizedResourceInDir(String dir, LocalizedResourceSet lrsrcSet,
+      boolean uncompress) {
+    File[] lrsrcs = readCurrentBlobs(dir);
+
+    if (lrsrcs != null) {
+      for (File rsrc : lrsrcs) {
+        LOG.info("add localized in dir found: " + rsrc);
+        /// strip off .suffix
+        String path = rsrc.getPath();
+        int p = path.lastIndexOf('.');
+        if (p > 0) {
+          path = path.substring(0, p);
+        }
+        LOG.debug("local file is: {} path is: {}", rsrc.getPath(), path);
+        LocalizedResource lrsrc = new LocalizedResource(new File(path).getName(), path,
+            uncompress);
+        lrsrcSet.addResource(lrsrc.getKey(), lrsrc, uncompress);
+      }
+    }
+  }
+
+  protected File[] readDirContents(String location) {
+    File dir = new File(location);
+    File[] files = null;
+    if (dir.exists()) {
+      files = dir.listFiles();
+    }
+    return files;
+  }
+
+  // Looks for files in the directory with .current suffix
+  protected File[] readCurrentBlobs(String location) {
+    File dir = new File(location);
+    File[] files = null;
+    if (dir.exists()) {
+      files = dir.listFiles(new FilenameFilter() {
+        @Override
+        public boolean accept(File dir, String name) {
+          return name.toLowerCase().endsWith(Utils.DEFAULT_CURRENT_BLOB_SUFFIX);
+        }
+      });
+    }
+    return files;
+  }
+
+  // Check to see if there are any existing files already localized.
+  protected void reconstructLocalizedResources() {
+    try {
+      LOG.info("Reconstruct localized resource: " + getUserCacheDir().getPath());
+      File[] users = readDirContents(getUserCacheDir().getPath());
+
+      if (users != null) {
+        for (File userDir : users) {
+          String user = userDir.getName();
+          LOG.debug("looking in: {} for user: {}", userDir.getPath(), user);
+          LocalizedResourceSet newSet = new LocalizedResourceSet(user);
+          LocalizedResourceSet lrsrcSet = _userRsrc.putIfAbsent(user, newSet);
+          if (lrsrcSet == null) {
+            lrsrcSet = newSet;
+          }
+          addLocalizedResourceInDir(getCacheDirForFiles(getLocalUserFileCacheDir(user)).getPath(),
+              lrsrcSet, false);
+          addLocalizedResourceInDir(
+              getCacheDirForArchives(getLocalUserFileCacheDir(user)).getPath(),
+              lrsrcSet, true);
+        }
+      } else {
+        LOG.warn("No left over resources found for any user during reconstructing of local resources at: {}", getUserCacheDir().getPath());
+      }
+    } catch (Exception e) {
+      LOG.error("ERROR reconstructing localized resources", e);
+    }
+  }
+
+  // ignores invalid user/topo/key
+  public synchronized void removeBlobReference(String key, String user, String topo,
+      boolean uncompress) throws AuthorizationException, KeyNotFoundException {
+    LocalizedResourceSet lrsrcSet = _userRsrc.get(user);
+    if (lrsrcSet != null) {
+      LocalizedResource lrsrc = lrsrcSet.get(key, uncompress);
+      if (lrsrc != null) {
+        LOG.debug("removing blob reference to: {} for topo: {}", key, topo);
+        lrsrc.removeReference(topo);
+      } else {
+        LOG.warn("trying to remove non-existent blob, key: " + key + " for user: " + user +
+            " topo: " + topo);
+      }
+    } else {
+      LOG.warn("trying to remove blob for non-existent resource set for user: " + user + " key: "
+          + key + " topo: " + topo);
+    }
+  }
+
+  public synchronized void addReferences(List<LocalResource> localresource, String user,
+       String topo) {
+    LocalizedResourceSet lrsrcSet = _userRsrc.get(user);
+    if (lrsrcSet != null) {
+      for (LocalResource blob : localresource) {
+        LocalizedResource lrsrc = lrsrcSet.get(blob.getBlobName(), blob.shouldUncompress());
+        if (lrsrc != null) {
+          lrsrc.addReference(topo);
+          LOG.debug("added reference for topo: {} key: {}", topo, blob);
+        } else {
+          LOG.warn("trying to add reference to non-existent blob, key: " + blob + " topo: " + topo);
+        }
+      }
+    } else {
+      LOG.warn("trying to add reference to non-existent local resource set, " +
+          "user: " + user + " topo: " + topo);
+    }
+  }
+
+  /**
+   * This function either returns the blob in the existing cache or if it doesn't exist in the
+   * cache, it will download the blob and will block until the download is complete.
+   */
+  public LocalizedResource getBlob(LocalResource localResource, String user, String topo,
+       File userFileDir) throws AuthorizationException, KeyNotFoundException, IOException {
+    ArrayList<LocalResource> arr = new ArrayList<LocalResource>();
+    arr.add(localResource);
+    List<LocalizedResource> results = getBlobs(arr, user, topo, userFileDir);
+    if (results.isEmpty() || results.size() != 1) {
+      throw new IOException("Unknown error getting blob: " + localResource + ", for user: " + user +
+          ", topo: " + topo);
+    }
+    return results.get(0);
+  }
+
+  protected boolean isLocalizedResourceDownloaded(LocalizedResource lrsrc) {
+    File rsrcFileCurrent = new File(lrsrc.getCurrentSymlinkPath());
+    File rsrcFileWithVersion = new File(lrsrc.getFilePathWithVersion());
+    File versionFile = new File(lrsrc.getVersionFilePath());
+    return (rsrcFileWithVersion.exists() && rsrcFileCurrent.exists() && versionFile.exists());
+  }
+
+  protected boolean isLocalizedResourceUpToDate(LocalizedResource lrsrc,
+      ClientBlobStore blobstore) throws AuthorizationException, KeyNotFoundException {
+    String localFile = lrsrc.getFilePath();
+    long nimbusBlobVersion = Utils.nimbusVersionOfBlob(lrsrc.getKey(), blobstore);
+    long currentBlobVersion = Utils.localVersionOfBlob(localFile);
+    return (nimbusBlobVersion == currentBlobVersion);
+  }
+
+  protected ClientBlobStore getClientBlobStore() {
+    return Utils.getClientBlobStoreForSupervisor(_conf);
+  }
+
+  /**
+   * This function updates blobs on the supervisor. It uses a separate thread pool and runs
+   * asynchronously of the download and delete.
+   */
+  public List<LocalizedResource> updateBlobs(List<LocalResource> localResources,
+       String user) throws AuthorizationException, KeyNotFoundException, IOException {
+    LocalizedResourceSet lrsrcSet = _userRsrc.get(user);
+    ArrayList<LocalizedResource> results = new ArrayList<>();
+    ArrayList<Callable<LocalizedResource>> updates = new ArrayList<>();
+
+    if (lrsrcSet == null) {
+      // resource set must have been removed
+      return results;
+    }
+    ClientBlobStore blobstore = null;
+    try {
+      blobstore = getClientBlobStore();
+      for (LocalResource localResource: localResources) {
+        String key = localResource.getBlobName();
+        LocalizedResource lrsrc = lrsrcSet.get(key, localResource.shouldUncompress());
+        if (lrsrc == null) {
+          LOG.warn("blob requested for update doesn't exist: {}", key);
+          continue;
+        } else {
+          // update it if either the version isn't the latest or if any local blob files are missing
+          if (!isLocalizedResourceUpToDate(lrsrc, blobstore) ||
+              !isLocalizedResourceDownloaded(lrsrc)) {
+            LOG.debug("updating blob: {}", key);
+            updates.add(new DownloadBlob(this, _conf, key, new File(lrsrc.getFilePath()), user,
+                lrsrc.isUncompressed(), true));
+          }
+        }
+      }
+    } finally {
+      if(blobstore != null) {
+        blobstore.shutdown();
+      }
+    }
+    try {
+      List<Future<LocalizedResource>> futures = _updateExecService.invokeAll(updates);
+      for (Future<LocalizedResource> futureRsrc : futures) {
+        try {
+          LocalizedResource lrsrc = futureRsrc.get();
+          // put the resource just in case it was removed at same time by the cleaner
+          LocalizedResourceSet newSet = new LocalizedResourceSet(user);
+          LocalizedResourceSet newlrsrcSet = _userRsrc.putIfAbsent(user, newSet);
+          if (newlrsrcSet == null) {
+            newlrsrcSet = newSet;
+          }
+          newlrsrcSet.updateResource(lrsrc.getKey(), lrsrc, lrsrc.isUncompressed());
+          results.add(lrsrc);
+        }
+        catch (ExecutionException e) {
+          LOG.error("Error updating blob: ", e);
+          if (e.getCause() instanceof AuthorizationException) {
+            throw (AuthorizationException)e.getCause();
+          }
+          if (e.getCause() instanceof KeyNotFoundException) {
+            throw (KeyNotFoundException)e.getCause();
+          }
+        }
+      }
+    } catch (RejectedExecutionException re) {
+      LOG.error("Error updating blobs : ", re);
+    } catch (InterruptedException ie) {
+      throw new IOException("Interrupted Exception", ie);
+    }
+    return results;
+  }
+
+  /**
+   * This function either returns the blobs in the existing cache or if they don't exist in the
+   * cache, it downloads them in parallel (up to SUPERVISOR_BLOBSTORE_DOWNLOAD_THREAD_COUNT)
+   * and will block until all of them have been downloaded
+   */
+  public synchronized List<LocalizedResource> getBlobs(List<LocalResource> localResources,
+      String user, String topo, File userFileDir)
+      throws AuthorizationException, KeyNotFoundException, IOException {
+
+    LocalizedResourceSet newSet = new LocalizedResourceSet(user);
+    LocalizedResourceSet lrsrcSet = _userRsrc.putIfAbsent(user, newSet);
+    if (lrsrcSet == null) {
+      lrsrcSet = newSet;
+    }
+    ArrayList<LocalizedResource> results = new ArrayList<>();
+    ArrayList<Callable<LocalizedResource>> downloads = new ArrayList<>();
+
+    ClientBlobStore blobstore = null;
+    try {
+      blobstore = getClientBlobStore();
+      for (LocalResource localResource: localResources) {
+        String key = localResource.getBlobName();
+        boolean uncompress = localResource.shouldUncompress();
+        LocalizedResource lrsrc = lrsrcSet.get(key, localResource.shouldUncompress());
+        boolean isUpdate = false;
+        if ((lrsrc != null) && (lrsrc.isUncompressed() == localResource.shouldUncompress()) &&
+            (isLocalizedResourceDownloaded(lrsrc))) {
+          if (isLocalizedResourceUpToDate(lrsrc, blobstore)) {
+            LOG.debug("blob already exists: {}", key);
+            lrsrc.addReference(topo);
+            results.add(lrsrc);
+            continue;
+          }
+          LOG.debug("blob exists but isn't up to date: {}", key);
+          isUpdate = true;
+        }
+
+        // go off to blobstore and get it
+        // assume dir passed in exists and has correct permission
+        LOG.debug("fetching blob: {}", key);
+        File downloadDir = getCacheDirForFiles(userFileDir);
+        File localFile = new File(downloadDir, key);
+        if (uncompress) {
+          // for compressed file, download to archives dir
+          downloadDir = getCacheDirForArchives(userFileDir);
+          localFile = new File(downloadDir, key);
+        }
+        downloadDir.mkdir();
+        downloads.add(new DownloadBlob(this, _conf, key, localFile, user, uncompress,
+            isUpdate));
+      }
+    } finally {
+      if(blobstore !=null) {
+        blobstore.shutdown();
+      }
+    }
+    try {
+      List<Future<LocalizedResource>> futures = _execService.invokeAll(downloads);
+      for (Future<LocalizedResource> futureRsrc: futures) {
+        LocalizedResource lrsrc = futureRsrc.get();
+        lrsrc.addReference(topo);
+        lrsrcSet.addResource(lrsrc.getKey(), lrsrc, lrsrc.isUncompressed());
+        results.add(lrsrc);
+      }
+    } catch (ExecutionException e) {
+      if (e.getCause() instanceof AuthorizationException)
+        throw (AuthorizationException)e.getCause();
+      else if (e.getCause() instanceof KeyNotFoundException) {
+        throw (KeyNotFoundException)e.getCause();
+      } else {
+        throw new IOException("Error getting blobs", e);
+      }
+    } catch (RejectedExecutionException re) {
+      throw new IOException("RejectedExecutionException: ", re);
+    } catch (InterruptedException ie) {
+      throw new IOException("Interrupted Exception", ie);
+    }
+    return results;
+  }
+
+  static class DownloadBlob implements Callable<LocalizedResource> {
+
+    private Localizer _localizer;
+    private Map _conf;
+    private String _key;
+    private File _localFile;
+    private String _user;
+    private boolean _uncompress;
+    private boolean _isUpdate;
+
+    public DownloadBlob(Localizer localizer, Map conf, String key, File localFile,
+        String user, boolean uncompress, boolean update) {
+      _localizer = localizer;
+      _conf = conf;
+      _key = key;
+      _localFile = localFile;
+      _user = user;
+      _uncompress = uncompress;
+      _isUpdate = update;
+    }
+
+    @Override
+    public LocalizedResource call()
+        throws AuthorizationException, KeyNotFoundException, IOException  {
+      return _localizer.downloadBlob(_conf, _key, _localFile, _user, _uncompress,
+        _isUpdate);
+    }
+  }
+
+  private LocalizedResource downloadBlob(Map conf, String key, File localFile,
+      String user, boolean uncompress, boolean isUpdate)
+      throws AuthorizationException, KeyNotFoundException, IOException {
+    ClientBlobStore blobstore = null;
+    try {
+      blobstore = getClientBlobStore();
+      long nimbusBlobVersion = Utils.nimbusVersionOfBlob(key, blobstore);
+      long oldVersion = Utils.localVersionOfBlob(localFile.toString());
+      FileOutputStream out = null;
+      PrintWriter writer = null;
+      int numTries = 0;
+      String localizedPath = localFile.toString();
+      String localFileWithVersion = Utils.constructBlobWithVersionFileName(localFile.toString(),
+              nimbusBlobVersion);
+      String localVersionFile = Utils.constructVersionFileName(localFile.toString());
+      String downloadFile = localFileWithVersion;
+      if (uncompress) {
+        // we need to download to temp file and then unpack into the one requested
+        downloadFile = new File(localFile.getParent(), TO_UNCOMPRESS + localFile.getName()).toString();
+      }
+      while (numTries < _blobDownloadRetries) {
+        out = new FileOutputStream(downloadFile);
+        numTries++;
+        try {
+          if (!Utils.canUserReadBlob(blobstore.getBlobMeta(key), user)) {
+            throw new AuthorizationException(user + " does not have READ access to " + key);
+          }
+          InputStreamWithMeta in = blobstore.getBlob(key);
+          byte[] buffer = new byte[1024];
+          int len;
+          while ((len = in.read(buffer)) >= 0) {
+            out.write(buffer, 0, len);
+          }
+          out.close();
+          in.close();
+          if (uncompress) {
+            Utils.unpack(new File(downloadFile), new File(localFileWithVersion));
+            LOG.debug("uncompressed " + downloadFile + " to: " + localFileWithVersion);
+          }
+
+          // Next write the version.
+          LOG.info("Blob: " + key + " updated with new Nimbus-provided version: " +
+              nimbusBlobVersion + " local version was: " + oldVersion);
+          // The false parameter ensures overwriting the version file, not appending
+          writer = new PrintWriter(
+              new BufferedWriter(new FileWriter(localVersionFile, false)));
+          writer.println(nimbusBlobVersion);
+          writer.close();
+
+          try {
+            setBlobPermissions(conf, user, localFileWithVersion);
+            setBlobPermissions(conf, user, localVersionFile);
+
+            // Update the key.current symlink. First create tmp symlink and do
+            // move of tmp to current so that the operation is atomic.
+            String tmp_uuid_local = java.util.UUID.randomUUID().toString();
+            LOG.debug("Creating a symlink @" + localFile + "." + tmp_uuid_local + " , " +
+                "linking to: " + localFile + "." + nimbusBlobVersion);
+            File uuid_symlink = new File(localFile + "." + tmp_uuid_local);
+
+            Files.createSymbolicLink(uuid_symlink.toPath(),
+                Paths.get(Utils.constructBlobWithVersionFileName(localFile.toString(),
+                        nimbusBlobVersion)));
+            File current_symlink = new File(Utils.constructBlobCurrentSymlinkName(
+                    localFile.toString()));
+            Files.move(uuid_symlink.toPath(), current_symlink.toPath(), ATOMIC_MOVE);
+          } catch (IOException e) {
+            // if we fail after writing the version file but before we move current link we need to
+            // restore the old version to the file
+            try {
+              PrintWriter restoreWriter = new PrintWriter(
+                  new BufferedWriter(new FileWriter(localVersionFile, false)));
+              restoreWriter.println(oldVersion);
+              restoreWriter.close();
+            } catch (IOException ignore) {}
+            throw e;
+          }
+
+          String oldBlobFile = localFile + "." + oldVersion;
+          try {
+            // Remove the old version. Note that if a number of processes have that file open,
+            // the OS will keep the old blob file around until they all close the handle and only
+            // then deletes it. No new process will open the old blob, since the users will open the
+            // blob through the "blob.current" symlink, which always points to the latest version of
+            // a blob. Remove the old version after the current symlink is updated as to not affect
+            // anyone trying to read it.
+            if ((oldVersion != -1) && (oldVersion != nimbusBlobVersion)) {
+              LOG.info("Removing an old blob file:" + oldBlobFile);
+              Files.delete(Paths.get(oldBlobFile));
+            }
+          } catch (IOException e) {
+            // At this point we have downloaded everything and moved symlinks.  If the remove of
+            // old fails just log an error
+            LOG.error("Exception removing old blob version: " + oldBlobFile);
+          }
+
+          break;
+        } catch (AuthorizationException ae) {
+          // we consider this non-retriable exceptions
+          if (out != null) {
+            out.close();
+          }
+          new File(downloadFile).delete();
+          throw ae;
+        } catch (IOException | KeyNotFoundException e) {
+          if (out != null) {
+            out.close();
+          }
+          if (writer != null) {
+            writer.close();
+          }
+          new File(downloadFile).delete();
+          if (uncompress) {
+            try {
+              FileUtils.deleteDirectory(new File(localFileWithVersion));
+            } catch (IOException ignore) {}
+          }
+          if (!isUpdate) {
+            // don't want to remove existing version file if its an update
+            new File(localVersionFile).delete();
+          }
+
+          if (numTries < _blobDownloadRetries) {
+            LOG.error("Failed to download blob, retrying", e);
+          } else {
+            throw e;
+          }
+        }
+      }
+      return new LocalizedResource(key, localizedPath, uncompress);
+    } finally {
+      if(blobstore != null) {
+        blobstore.shutdown();
+      }
+    }
+  }
+
+  public void setBlobPermissions(Map conf, String user, String path)
+      throws IOException {
+
+    if (!Utils.getBoolean(conf.get(Config.SUPERVISOR_RUN_WORKER_AS_USER), false)) {
+      return;
+    }
+    String wlCommand = Utils.getString(conf.get(Config.SUPERVISOR_WORKER_LAUNCHER), "");
+    if (wlCommand.isEmpty()) {
+      String stormHome = System.getProperty("storm.home");
+      wlCommand = stormHome + "/bin/worker-launcher";
+    }
+    List<String> command = new ArrayList<String>(Arrays.asList(wlCommand, user, "blob", path));
+
+    String[] commandArray = command.toArray(new String[command.size()]);
+    ShellCommandExecutor shExec = new ShellCommandExecutor(commandArray);
+    LOG.info("Setting blob permissions, command: {}", Arrays.toString(commandArray));
+
+    try {
+      shExec.execute();
+      LOG.debug("output: {}", shExec.getOutput());
+    } catch (ExitCodeException e) {
+      int exitCode = shExec.getExitCode();
+      LOG.warn("Exit code from worker-launcher is : " + exitCode, e);
+      LOG.debug("output: {}", shExec.getOutput());
+      throw new IOException("Setting blob permissions failed" +
+          " (exitCode=" + exitCode + ") with output: " + shExec.getOutput(), e);
+    }
+  }
+
+
+  public synchronized void handleCacheCleanup() {
+    LocalizedResourceRetentionSet toClean = new LocalizedResourceRetentionSet(_cacheTargetSize);
+    // need one large set of all and then clean via LRU
+    for (LocalizedResourceSet t : _userRsrc.values()) {
+      toClean.addResources(t);
+      LOG.debug("Resources to be cleaned after adding {} : {}", t.getUser(), toClean);
+    }
+    toClean.cleanup();
+    LOG.debug("Resource cleanup: {}", toClean);
+    for (LocalizedResourceSet t : _userRsrc.values()) {
+      if (t.getSize() == 0) {
+        String user = t.getUser();
+
+        LOG.debug("removing empty set: {}", user);
+        File userFileCacheDir = getLocalUserFileCacheDir(user);
+        getCacheDirForFiles(userFileCacheDir).delete();
+        getCacheDirForArchives(userFileCacheDir).delete();
+        getLocalUserFileCacheDir(user).delete();
+        boolean dirsRemoved = getLocalUserDir(user).delete();
+        // to catch race with update thread
+        if (dirsRemoved) {
+          _userRsrc.remove(user);
+        }
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/7029aee5/storm-core/src/jvm/backtype/storm/security/auth/NimbusPrincipal.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/security/auth/NimbusPrincipal.java b/storm-core/src/jvm/backtype/storm/security/auth/NimbusPrincipal.java
new file mode 100644
index 0000000..c07ae84
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/security/auth/NimbusPrincipal.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package backtype.storm.security.auth;
+
+import java.security.Principal;
+
+public class NimbusPrincipal implements Principal {
+
+  @Override
+  public String getName() {
+    return NimbusPrincipal.class.toString();
+  }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/7029aee5/storm-core/src/jvm/backtype/storm/utils/BufferInputStream.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/utils/BufferInputStream.java b/storm-core/src/jvm/backtype/storm/utils/BufferInputStream.java
new file mode 100644
index 0000000..c718858
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/utils/BufferInputStream.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package backtype.storm.utils;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Arrays;
+
+
+public class BufferInputStream {
+    byte[] buffer;
+    InputStream stream;
+
+    public BufferInputStream(InputStream stream, int bufferSize) {
+        this.stream = stream;
+        buffer = new byte[bufferSize];
+    }
+
+    public BufferInputStream(InputStream stream) {
+        this(stream, 15*1024);
+    }
+
+    public byte[] read() throws IOException {
+        int length = stream.read(buffer);
+        if(length==-1) {
+            close();
+            return new byte[0];
+        } else if(length==buffer.length) {
+            return buffer;
+        } else {
+            return Arrays.copyOf(buffer, length);
+        }
+    }
+
+    public void close() throws IOException {
+        stream.close();
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/7029aee5/storm-core/src/jvm/backtype/storm/utils/ShellUtils.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/utils/ShellUtils.java b/storm-core/src/jvm/backtype/storm/utils/ShellUtils.java
index 8595b71..55e3866 100644
--- a/storm-core/src/jvm/backtype/storm/utils/ShellUtils.java
+++ b/storm-core/src/jvm/backtype/storm/utils/ShellUtils.java
@@ -102,6 +102,13 @@ abstract public class ShellUtils {
         this(interval, false);
     }
 
+    /** get the exit code
+     * @return the exit code of the process
+     */
+    public int getExitCode() {
+      return exitCode;
+    }
+
     /**
      * @param interval the minimum duration to wait before re-executing the
      *        command.

http://git-wip-us.apache.org/repos/asf/storm/blob/7029aee5/storm-core/src/jvm/backtype/storm/utils/Utils.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/utils/Utils.java b/storm-core/src/jvm/backtype/storm/utils/Utils.java
index c086be2..e0bbb1f 100644
--- a/storm-core/src/jvm/backtype/storm/utils/Utils.java
+++ b/storm-core/src/jvm/backtype/storm/utils/Utils.java
@@ -18,18 +18,35 @@
 package backtype.storm.utils;
 
 import backtype.storm.Config;
+import backtype.storm.blobstore.BlobStore;
+import backtype.storm.blobstore.BlobStoreAclHandler;
+import backtype.storm.blobstore.ClientBlobStore;
+import backtype.storm.blobstore.InputStreamWithMeta;
+import backtype.storm.blobstore.LocalFsBlobStore;
+import backtype.storm.generated.AccessControl;
+import backtype.storm.generated.AccessControlType;
 import backtype.storm.generated.AuthorizationException;
 import backtype.storm.generated.ComponentCommon;
 import backtype.storm.generated.ComponentObject;
+import backtype.storm.generated.KeyNotFoundException;
+import backtype.storm.generated.ReadableBlobMeta;
+import backtype.storm.generated.SettableBlobMeta;
 import backtype.storm.generated.StormTopology;
+import backtype.storm.localizer.Localizer;
+import backtype.storm.nimbus.NimbusInfo;
 import backtype.storm.serialization.DefaultSerializationDelegate;
 import backtype.storm.serialization.SerializationDelegate;
 import clojure.lang.IFn;
 import clojure.lang.RT;
+import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
+import org.apache.commons.compress.archivers.tar.TarArchiveInputStream;
 import org.apache.commons.lang.StringUtils;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.thrift.TBase;
+import org.apache.thrift.TDeserializer;
 import org.apache.thrift.TException;
+import org.apache.thrift.TSerializer;
 import org.apache.zookeeper.ZooDefs;
 import org.apache.zookeeper.data.ACL;
 import org.apache.zookeeper.data.Id;
@@ -39,13 +56,16 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.yaml.snakeyaml.Yaml;
 import org.yaml.snakeyaml.constructor.SafeConstructor;
-
 import java.net.URL;
 import java.net.URLDecoder;
 import java.nio.ByteBuffer;
-import java.nio.channels.Channels;
-import java.nio.channels.WritableByteChannel;
+import java.nio.file.FileSystems;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.nio.file.attribute.PosixFilePermission;
+
 import java.io.File;
+import java.io.FileReader;
 import java.io.FileInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.ObjectOutputStream;
@@ -57,13 +77,21 @@ import java.io.InputStream;
 import java.io.OutputStream;
 import java.io.FileOutputStream;
 import java.io.BufferedReader;
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.PrintStream;
 import java.io.RandomAccessFile;
 import java.io.Serializable;
 import java.io.IOException;
+
+import java.util.jar.JarEntry;
+import java.util.jar.JarFile;
 import java.util.Map;
+import java.util.Set;
 import java.util.Iterator;
 import java.util.Enumeration;
 import java.util.List;
+import java.util.Arrays;
 import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.HashMap;
@@ -83,6 +111,9 @@ import org.apache.thrift.TSerializer;
 public class Utils {
     private static final Logger LOG = LoggerFactory.getLogger(Utils.class);
     public static final String DEFAULT_STREAM_ID = "default";
+    public static final String DEFAULT_BLOB_VERSION_SUFFIX = ".version";
+    public static final String CURRENT_BLOB_SUFFIX_ID = "current";
+    public static final String DEFAULT_CURRENT_BLOB_SUFFIX = "." + CURRENT_BLOB_SUFFIX_ID;
     private static ThreadLocal<TSerializer> threadSer = new ThreadLocal<TSerializer>();
     private static ThreadLocal<TDeserializer> threadDes = new ThreadLocal<TDeserializer>();
 
@@ -110,28 +141,6 @@ public class Utils {
         return serializationDelegate.deserialize(serialized, clazz);
     }
 
-    public static byte[] thriftSerialize(TBase t) {
-        try {
-            TSerializer ser = threadSer.get();
-            if (ser == null) {
-                ser = new TSerializer();
-                threadSer.set(ser);
-            } 
-            return ser.serialize(t);
-        } catch (TException e) {
-            throw new RuntimeException(e);
-        }
-    }
-
-    private static TDeserializer getDes() {
-        TDeserializer des = threadDes.get();
-        if(des == null) {
-            des = new TDeserializer();
-            threadDes.set(des);
-        }
-        return des;
-    }
-
     public static <T> T thriftDeserialize(Class c, byte[] b, int offset, int length) {
         try {
             T ret = (T) c.newInstance();
@@ -143,18 +152,6 @@ public class Utils {
         }
     }
 
-    public static <T> T thriftDeserialize(Class c, byte[] b) {
-        try {
-            T ret = (T) c.newInstance();
-            TDeserializer des = getDes();
-            des.deserialize((TBase) ret, b);
-            return ret;
-        } catch (Exception e) {
-            throw new RuntimeException(e);
-        }
-
-    }
-    
     public static byte[] javaSerialize(Object obj) {
         try {
             ByteArrayOutputStream bos = new ByteArrayOutputStream();
@@ -431,38 +428,170 @@ public class Utils {
         return ret;
     }
 
-    public static void downloadFromMaster(Map conf, String file, String localFile) throws AuthorizationException, IOException, TException {
-        NimbusClient client = NimbusClient.getConfiguredClient(conf);
-        try {
-            download(client, file, localFile);
-        } finally {
-            client.close();
+
+    public static Localizer createLocalizer(Map conf, String baseDir) {
+        return new Localizer(conf, baseDir);
+    }
+
+    public static ClientBlobStore getClientBlobStoreForSupervisor(Map conf) {
+        ClientBlobStore store = (ClientBlobStore) newInstance(
+                (String) conf.get(Config.SUPERVISOR_BLOBSTORE));
+        store.prepare(conf);
+        return store;
+    }
+
+    public static BlobStore getNimbusBlobStore(Map conf, NimbusInfo nimbusInfo) {
+        return getNimbusBlobStore(conf, null, nimbusInfo);
+    }
+
+    public static BlobStore getNimbusBlobStore(Map conf, String baseDir, NimbusInfo nimbusInfo) {
+        String type = (String)conf.get(Config.NIMBUS_BLOBSTORE);
+        if (type == null) {
+            type = LocalFsBlobStore.class.getName();
         }
+        BlobStore store = (BlobStore) newInstance(type);
+        HashMap nconf = new HashMap(conf);
+        // only enable cleanup of blobstore on nimbus
+        nconf.put(Config.BLOBSTORE_CLEANUP_ENABLE, Boolean.TRUE);
+        store.prepare(nconf, baseDir, nimbusInfo);
+        return store;
     }
 
-    public static void downloadFromHost(Map conf, String file, String localFile, String host, int port) throws IOException, TException, AuthorizationException {
-        NimbusClient client = new NimbusClient (conf, host, port, null);
-        try {
-            download(client, file, localFile);
-        } finally {
-            client.close();
+    /**
+     * Meant to be called only by the supervisor for stormjar/stormconf/stormcode files.
+     * @param key
+     * @param localFile
+     * @param cb
+     * @throws AuthorizationException
+     * @throws KeyNotFoundException
+     * @throws IOException
+     */
+    public static void downloadResourcesAsSupervisor(String key, String localFile,
+                                                     ClientBlobStore cb) throws AuthorizationException, KeyNotFoundException, IOException {
+        final int MAX_RETRY_ATTEMPTS = 2;
+        final int ATTEMPTS_INTERVAL_TIME = 100;
+        for (int retryAttempts = 0; retryAttempts < MAX_RETRY_ATTEMPTS; retryAttempts++) {
+            if (downloadResourcesAsSupervisorAttempt(cb, key, localFile)) {
+                break;
+            }
+            Utils.sleep(ATTEMPTS_INTERVAL_TIME);
         }
     }
 
-    private static void download(NimbusClient client, String file, String localFile) throws IOException, TException, AuthorizationException {
-        WritableByteChannel out = Channels.newChannel(new FileOutputStream(localFile));
+    public static ClientBlobStore getClientBlobStore(Map conf) {
+        ClientBlobStore store = (ClientBlobStore) Utils.newInstance((String) conf.get(Config.CLIENT_BLOBSTORE));
+        store.prepare(conf);
+        return store;
+    }
+
+    private static boolean downloadResourcesAsSupervisorAttempt(ClientBlobStore cb, String key, String localFile) {
+        boolean isSuccess = false;
+        FileOutputStream out = null;
+        InputStreamWithMeta in = null;
         try {
-            String id = client.getClient().beginFileDownload(file);
-            while (true) {
-                ByteBuffer chunk = client.getClient().downloadChunk(id);
-                int written = out.write(chunk);
-                if (written == 0) break;
+            out = new FileOutputStream(localFile);
+            in = cb.getBlob(key);
+            long fileSize = in.getFileLength();
+
+            byte[] buffer = new byte[1024];
+            int len;
+            int downloadFileSize = 0;
+            while ((len = in.read(buffer)) >= 0) {
+                out.write(buffer, 0, len);
+                downloadFileSize += len;
             }
+
+            isSuccess = (fileSize == downloadFileSize);
+        } catch (TException | IOException e) {
+            LOG.error("An exception happened while downloading {} from blob store.", localFile, e);
         } finally {
-            out.close();
+            try {
+                if (out != null) {
+                    out.close();
+                }
+            } catch (IOException ignored) {}
+            try {
+                if (in != null) {
+                    in.close();
+                }
+            } catch (IOException ignored) {}
+        }
+        if (!isSuccess) {
+            try {
+                Files.deleteIfExists(Paths.get(localFile));
+            } catch (IOException ex) {
+                LOG.error("Failed trying to delete the partially downloaded {}", localFile, ex);
+            }
+        }
+        return isSuccess;
+    }
+
+    public static boolean checkFileExists(String dir, String file) {
+        return Files.exists(new File(dir, file).toPath());
+    }
+
+    public static long nimbusVersionOfBlob(String key, ClientBlobStore cb) throws AuthorizationException, KeyNotFoundException {
+        long nimbusBlobVersion = 0;
+        ReadableBlobMeta metadata = cb.getBlobMeta(key);
+        nimbusBlobVersion = metadata.get_version();
+        return nimbusBlobVersion;
+    }
+
+    public static String getFileOwner(String path) throws IOException {
+        return Files.getOwner(FileSystems.getDefault().getPath(path)).getName();
+    }
+
+    public static long localVersionOfBlob(String localFile) {
+        File f = new File(localFile + DEFAULT_BLOB_VERSION_SUFFIX);
+        long currentVersion = 0;
+        if (f.exists() && !(f.isDirectory())) {
+            BufferedReader br = null;
+            try {
+                br = new BufferedReader(new FileReader(f));
+                String line = br.readLine();
+                currentVersion = Long.parseLong(line);
+            } catch (IOException e) {
+                throw new RuntimeException(e);
+            } finally {
+                try {
+                    if (br != null) {
+                        br.close();
+                    }
+                } catch (Exception ignore) {
+                    LOG.error("Exception trying to cleanup", ignore);
+                }
+            }
+            return currentVersion;
+        } else {
+            return -1;
+        }
+    }
+
+    public static String constructBlobWithVersionFileName(String fileName, long version) {
+        return fileName + "." + version;
+    }
+
+    public static String constructBlobCurrentSymlinkName(String fileName) {
+        return fileName + Utils.DEFAULT_CURRENT_BLOB_SUFFIX;
+    }
+
+    public static String constructVersionFileName(String fileName) {
+        return fileName + Utils.DEFAULT_BLOB_VERSION_SUFFIX;
+    }
+    // only works on operating  systems that support posix
+    public static void restrictPermissions(String baseDir) {
+        try {
+            Set<PosixFilePermission> perms = new HashSet<PosixFilePermission>(
+                    Arrays.asList(PosixFilePermission.OWNER_READ, PosixFilePermission.OWNER_WRITE,
+                            PosixFilePermission.OWNER_EXECUTE, PosixFilePermission.GROUP_READ,
+                            PosixFilePermission.GROUP_EXECUTE));
+            Files.setPosixFilePermissions(FileSystems.getDefault().getPath(baseDir), perms);
+        } catch (IOException e) {
+            throw new RuntimeException(e);
         }
     }
 
+
     public static IFn loadClojureFn(String namespace, String name) {
         try {
             clojure.lang.Compiler.eval(RT.readString("(require '" + namespace + ")"));
@@ -505,6 +634,37 @@ public class Utils {
         return result;
     }
 
+    private static TDeserializer getDes() {
+        TDeserializer des = threadDes.get();
+        if(des == null) {
+            des = new TDeserializer();
+            threadDes.set(des);
+        }
+        return des;
+    }
+
+    public static byte[] thriftSerialize(TBase t) {
+        try {
+            TSerializer ser = threadSer.get();
+            if (ser == null) {
+                ser = new TSerializer();
+                threadSer.set(ser);
+            }
+            return ser.serialize(t);
+        } catch (TException e) {
+            LOG.error("Failed to serialize to thrift: ", e);
+            throw new RuntimeException(e);
+        }
+    }
+
+    public static <T> T thriftDeserialize(Class c, byte[] b) {
+        try {
+            return Utils.thriftDeserialize(c, b, 0, b.length);
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+
     public static Integer getInt(Object o, Integer defaultValue) {
         if (null == o) {
             return defaultValue;
@@ -571,6 +731,245 @@ public class Utils {
         return UUID.randomUUID().getLeastSignificantBits();
     }
 
+    /**
+     * Unpack matching files from a jar. Entries inside the jar that do
+     * not match the given pattern will be skipped.
+     *
+     * @param jarFile the .jar file to unpack
+     * @param toDir the destination directory into which to unpack the jar
+     */
+    public static void unJar(File jarFile, File toDir)
+            throws IOException {
+        JarFile jar = new JarFile(jarFile);
+        try {
+            Enumeration<JarEntry> entries = jar.entries();
+            while (entries.hasMoreElements()) {
+                final JarEntry entry = entries.nextElement();
+                if (!entry.isDirectory()) {
+                    InputStream in = jar.getInputStream(entry);
+                    try {
+                        File file = new File(toDir, entry.getName());
+                        ensureDirectory(file.getParentFile());
+                        OutputStream out = new FileOutputStream(file);
+                        try {
+                            copyBytes(in, out, 8192);
+                        } finally {
+                            out.close();
+                        }
+                    } finally {
+                        in.close();
+                    }
+                }
+            }
+        } finally {
+            jar.close();
+        }
+    }
+
+    /**
+     * Copies from one stream to another.
+     *
+     * @param in InputStream to read from
+     * @param out OutputStream to write to
+     * @param buffSize the size of the buffer
+     */
+    public static void copyBytes(InputStream in, OutputStream out, int buffSize)
+            throws IOException {
+        PrintStream ps = out instanceof PrintStream ? (PrintStream)out : null;
+        byte buf[] = new byte[buffSize];
+        int bytesRead = in.read(buf);
+        while (bytesRead >= 0) {
+            out.write(buf, 0, bytesRead);
+            if ((ps != null) && ps.checkError()) {
+                throw new IOException("Unable to write to output stream.");
+            }
+            bytesRead = in.read(buf);
+        }
+    }
+
+    /**
+     * Ensure the existence of a given directory.
+     *
+     * @throws IOException if it cannot be created and does not already exist
+     */
+    private static void ensureDirectory(File dir) throws IOException {
+        if (!dir.mkdirs() && !dir.isDirectory()) {
+            throw new IOException("Mkdirs failed to create " +
+                    dir.toString());
+        }
+    }
+
+    /**
+     * Given a Tar File as input it will untar the file in a the untar directory
+     * passed as the second parameter
+     * <p/>
+     * This utility will untar ".tar" files and ".tar.gz","tgz" files.
+     *
+     * @param inFile   The tar file as input.
+     * @param untarDir The untar directory where to untar the tar file.
+     * @throws IOException
+     */
+    public static void unTar(File inFile, File untarDir) throws IOException {
+        if (!untarDir.mkdirs()) {
+            if (!untarDir.isDirectory()) {
+                throw new IOException("Mkdirs failed to create " + untarDir);
+            }
+        }
+
+        boolean gzipped = inFile.toString().endsWith("gz");
+        if (onWindows()) {
+            // Tar is not native to Windows. Use simple Java based implementation for
+            // tests and simple tar archives
+            unTarUsingJava(inFile, untarDir, gzipped);
+        } else {
+            // spawn tar utility to untar archive for full fledged unix behavior such
+            // as resolving symlinks in tar archives
+            unTarUsingTar(inFile, untarDir, gzipped);
+        }
+    }
+
+    private static void unTarUsingTar(File inFile, File untarDir,
+                                      boolean gzipped) throws IOException {
+        StringBuffer untarCommand = new StringBuffer();
+        if (gzipped) {
+            untarCommand.append(" gzip -dc '");
+            untarCommand.append(inFile.toString());
+            untarCommand.append("' | (");
+        }
+        untarCommand.append("cd '");
+        untarCommand.append(untarDir.toString());
+        untarCommand.append("' ; ");
+        untarCommand.append("tar -xf ");
+
+        if (gzipped) {
+            untarCommand.append(" -)");
+        } else {
+            untarCommand.append(inFile.toString());
+        }
+        String[] shellCmd = {"bash", "-c", untarCommand.toString()};
+        ShellUtils.ShellCommandExecutor shexec = new ShellUtils.ShellCommandExecutor(shellCmd);
+        shexec.execute();
+        int exitcode = shexec.getExitCode();
+        if (exitcode != 0) {
+            throw new IOException("Error untarring file " + inFile +
+                    ". Tar process exited with exit code " + exitcode);
+        }
+    }
+
+    private static void unTarUsingJava(File inFile, File untarDir,
+                                       boolean gzipped) throws IOException {
+        InputStream inputStream = null;
+        TarArchiveInputStream tis = null;
+        try {
+            if (gzipped) {
+                inputStream = new BufferedInputStream(new GZIPInputStream(
+                        new FileInputStream(inFile)));
+            } else {
+                inputStream = new BufferedInputStream(new FileInputStream(inFile));
+            }
+            tis = new TarArchiveInputStream(inputStream);
+            for (TarArchiveEntry entry = tis.getNextTarEntry(); entry != null; ) {
+                unpackEntries(tis, entry, untarDir);
+                entry = tis.getNextTarEntry();
+            }
+        } finally {
+            cleanup(tis, inputStream);
+        }
+    }
+
+    /**
+     * Close the Closeable objects and <b>ignore</b> any {@link IOException} or
+     * null pointers. Must only be used for cleanup in exception handlers.
+     *
+     * @param closeables the objects to close
+     */
+    private static void cleanup(java.io.Closeable... closeables) {
+        for (java.io.Closeable c : closeables) {
+            if (c != null) {
+                try {
+                    c.close();
+                } catch (IOException e) {
+                    LOG.debug("Exception in closing " + c, e);
+
+                }
+            }
+        }
+    }
+
+    private static void unpackEntries(TarArchiveInputStream tis,
+                                      TarArchiveEntry entry, File outputDir) throws IOException {
+        if (entry.isDirectory()) {
+            File subDir = new File(outputDir, entry.getName());
+            if (!subDir.mkdirs() && !subDir.isDirectory()) {
+                throw new IOException("Mkdirs failed to create tar internal dir "
+                        + outputDir);
+            }
+            for (TarArchiveEntry e : entry.getDirectoryEntries()) {
+                unpackEntries(tis, e, subDir);
+            }
+            return;
+        }
+        File outputFile = new File(outputDir, entry.getName());
+        if (!outputFile.getParentFile().exists()) {
+            if (!outputFile.getParentFile().mkdirs()) {
+                throw new IOException("Mkdirs failed to create tar internal dir "
+                        + outputDir);
+            }
+        }
+        int count;
+        byte data[] = new byte[2048];
+        BufferedOutputStream outputStream = new BufferedOutputStream(
+                new FileOutputStream(outputFile));
+
+        while ((count = tis.read(data)) != -1) {
+            outputStream.write(data, 0, count);
+        }
+        outputStream.flush();
+        outputStream.close();
+    }
+
+    public static boolean onWindows() {
+        if (System.getenv("OS") != null) {
+            return System.getenv("OS").equals("Windows_NT");
+        }
+        return false;
+    }
+
+    public static void unpack(File localrsrc, File dst) throws IOException {
+        String lowerDst = localrsrc.getName().toLowerCase();
+        if (lowerDst.endsWith(".jar")) {
+            unJar(localrsrc, dst);
+        } else if (lowerDst.endsWith(".zip")) {
+            unZip(localrsrc, dst);
+        } else if (lowerDst.endsWith(".tar.gz") ||
+                lowerDst.endsWith(".tgz") ||
+                lowerDst.endsWith(".tar")) {
+            unTar(localrsrc, dst);
+        } else {
+            LOG.warn("Cannot unpack " + localrsrc);
+            if (!localrsrc.renameTo(dst)) {
+                throw new IOException("Unable to rename file: [" + localrsrc
+                        + "] to [" + dst + "]");
+            }
+        }
+        if (localrsrc.isFile()) {
+            localrsrc.delete();
+        }
+    }
+
+    public static boolean canUserReadBlob(ReadableBlobMeta meta, String user) {
+        SettableBlobMeta settable = meta.get_settable();
+        for (AccessControl acl : settable.get_acl()) {
+            if (acl.get_type().equals(AccessControlType.OTHER) && (acl.get_access() & BlobStoreAclHandler.READ) > 0) {
+                return true;
+            }
+            if (acl.get_name().equals(user) && (acl.get_access() & BlobStoreAclHandler.READ) > 0) {
+                return true;
+            }
+        }
+        return false;
+    }
+
     public static CuratorFramework newCurator(Map conf, List<String> servers, Object port, String root) {
         return newCurator(conf, servers, port, root, null);
     }
@@ -702,6 +1101,38 @@ public class Utils {
         return ret;
     }
 
+    /**
+     * Takes an input dir or file and returns the disk usage on that local directory.
+     * Very basic implementation.
+     *
+     * @param dir The input dir to get the disk space of this local dir
+     * @return The total disk space of the input local directory
+     */
+    public static long getDU(File dir) {
+        long size = 0;
+        if (!dir.exists())
+            return 0;
+        if (!dir.isDirectory()) {
+            return dir.length();
+        } else {
+            File[] allFiles = dir.listFiles();
+            if(allFiles != null) {
+                for (int i = 0; i < allFiles.length; i++) {
+                    boolean isSymLink;
+                    try {
+                        isSymLink = org.apache.commons.io.FileUtils.isSymlink(allFiles[i]);
+                    } catch(IOException ioe) {
+                        isSymLink = true;
+                    }
+                    if(!isSymLink) {
+                        size += getDU(allFiles[i]);
+                    }
+                }
+            }
+            return size;
+        }
+    }
+
     public static String threadDump() {
         final StringBuilder dump = new StringBuilder();
         final java.lang.management.ThreadMXBean threadMXBean =  java.lang.management.ManagementFactory.getThreadMXBean();

http://git-wip-us.apache.org/repos/asf/storm/blob/7029aee5/storm-core/src/jvm/backtype/storm/validation/ConfigValidation.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/validation/ConfigValidation.java b/storm-core/src/jvm/backtype/storm/validation/ConfigValidation.java
index 44ec967..f1be007 100644
--- a/storm-core/src/jvm/backtype/storm/validation/ConfigValidation.java
+++ b/storm-core/src/jvm/backtype/storm/validation/ConfigValidation.java
@@ -477,8 +477,16 @@ public class ConfigValidation {
         }
     }
 
-    public static class PacemakerAuthTypeValidator extends Validator {
+    public static class MapOfStringToMapOfStringToObjectValidator extends Validator {
+      @Override
+      public  void validateField(String name, Object o) {
+        ConfigValidationUtils.NestableFieldValidator validator = ConfigValidationUtils.mapFv(ConfigValidationUtils.fv(String.class, false),
+                ConfigValidationUtils.mapFv(String.class, Object.class,true), true);
+        validator.validateField(name, o);
+      }
+    }
 
+    public static class PacemakerAuthTypeValidator extends Validator {
         @Override
         public void validateField(String name, Object o) {
             if(o == null) {
@@ -486,9 +494,9 @@ public class ConfigValidation {
             }
 
             if(o instanceof String &&
-               (((String)o).equals("NONE") ||
-                ((String)o).equals("DIGEST") ||
-                ((String)o).equals("KERBEROS"))) {
+                    (((String)o).equals("NONE") ||
+                            ((String)o).equals("DIGEST") ||
+                            ((String)o).equals("KERBEROS"))) {
                 return;
             }
             throw new IllegalArgumentException( "Field " + name + " must be one of \"NONE\", \"DIGEST\", or \"KERBEROS\"");

http://git-wip-us.apache.org/repos/asf/storm/blob/7029aee5/storm-core/src/jvm/backtype/storm/validation/ConfigValidationAnnotations.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/validation/ConfigValidationAnnotations.java b/storm-core/src/jvm/backtype/storm/validation/ConfigValidationAnnotations.java
index ed93370..cb742fa 100644
--- a/storm-core/src/jvm/backtype/storm/validation/ConfigValidationAnnotations.java
+++ b/storm-core/src/jvm/backtype/storm/validation/ConfigValidationAnnotations.java
@@ -51,7 +51,6 @@ public class ConfigValidationAnnotations {
     /**
      * Validators with fields: validatorClass and type
      */
-
     @Retention(RetentionPolicy.RUNTIME)
     @Target(ElementType.FIELD)
     public @interface isType {
@@ -82,7 +81,6 @@ public class ConfigValidationAnnotations {
     /**
      * Validators with fields: validatorClass
      */
-
     @Retention(RetentionPolicy.RUNTIME)
     @Target(ElementType.FIELD)
     public @interface isString {
@@ -109,7 +107,7 @@ public class ConfigValidationAnnotations {
     }
 
     /**
-     * validates on object is not null
+     * Validates on object is not null
      */
     @Retention(RetentionPolicy.RUNTIME)
     @Target(ElementType.FIELD)
@@ -118,7 +116,7 @@ public class ConfigValidationAnnotations {
     }
 
     /**
-     * validates that there are no duplicates in a list
+     * Validates that there are no duplicates in a list
      */
     @Retention(RetentionPolicy.RUNTIME)
     @Target(ElementType.FIELD)
@@ -142,7 +140,6 @@ public class ConfigValidationAnnotations {
      * Validates the type of each key and value in a map
      * Validator with fields: validatorClass, keyValidatorClass, valueValidatorClass
      */
-
     @Retention(RetentionPolicy.RUNTIME)
     @Target(ElementType.FIELD)
     public @interface isMapEntryType {
@@ -168,7 +165,7 @@ public class ConfigValidationAnnotations {
     }
 
     /**
-     * checks if a number is positive and whether zero inclusive
+     * Checks if a number is positive and whether zero inclusive
      * Validator with fields: validatorClass, includeZero
      */
     @Retention(RetentionPolicy.RUNTIME)
@@ -182,7 +179,6 @@ public class ConfigValidationAnnotations {
     /**
      * Complex/custom type validators
      */
-
     @Retention(RetentionPolicy.RUNTIME)
     @Target(ElementType.FIELD)
     public @interface isStringOrStringList {
@@ -204,7 +200,6 @@ public class ConfigValidationAnnotations {
     /**
      * For custom validators
      */
-
     @Retention(RetentionPolicy.RUNTIME)
     @Target(ElementType.FIELD)
     public @interface CustomValidator {

http://git-wip-us.apache.org/repos/asf/storm/blob/7029aee5/storm-core/src/py/storm/Nimbus-remote
----------------------------------------------------------------------
diff --git a/storm-core/src/py/storm/Nimbus-remote b/storm-core/src/py/storm/Nimbus-remote
index 63e7dce..5b8e396 100644
--- a/storm-core/src/py/storm/Nimbus-remote
+++ b/storm-core/src/py/storm/Nimbus-remote
@@ -55,6 +55,20 @@ if len(sys.argv) <= 1 or sys.argv[1] == '--help':
   print('  void setWorkerProfiler(string id, ProfileRequest profileRequest)')
   print('   getComponentPendingProfileActions(string id, string component_id, ProfileAction action)')
   print('  void uploadNewCredentials(string name, Credentials creds)')
+  print('  string beginCreateBlob(string key, SettableBlobMeta meta)')
+  print('  string beginUpdateBlob(string key)')
+  print('  void uploadBlobChunk(string session, string chunk)')
+  print('  void finishBlobUpload(string session)')
+  print('  void cancelBlobUpload(string session)')
+  print('  ReadableBlobMeta getBlobMeta(string key)')
+  print('  void setBlobMeta(string key, SettableBlobMeta meta)')
+  print('  BeginDownloadResult beginBlobDownload(string key)')
+  print('  string downloadBlobChunk(string session)')
+  print('  void deleteBlob(string key)')
+  print('  ListBlobsResult listBlobs(string session)')
+  print('  i32 getBlobReplication(string key)')
+  print('  i32 updateBlobReplication(string key, i32 replication)')
+  print('  void createStateInZookeeper(string key)')
   print('  string beginFileUpload()')
   print('  void uploadChunk(string location, string chunk)')
   print('  void finishFileUpload(string location)')
@@ -204,6 +218,90 @@ elif cmd == 'uploadNewCredentials':
     sys.exit(1)
   pp.pprint(client.uploadNewCredentials(args[0],eval(args[1]),))
 
+elif cmd == 'beginCreateBlob':
+  if len(args) != 2:
+    print('beginCreateBlob requires 2 args')
+    sys.exit(1)
+  pp.pprint(client.beginCreateBlob(args[0],eval(args[1]),))
+
+elif cmd == 'beginUpdateBlob':
+  if len(args) != 1:
+    print('beginUpdateBlob requires 1 args')
+    sys.exit(1)
+  pp.pprint(client.beginUpdateBlob(args[0],))
+
+elif cmd == 'uploadBlobChunk':
+  if len(args) != 2:
+    print('uploadBlobChunk requires 2 args')
+    sys.exit(1)
+  pp.pprint(client.uploadBlobChunk(args[0],args[1],))
+
+elif cmd == 'finishBlobUpload':
+  if len(args) != 1:
+    print('finishBlobUpload requires 1 args')
+    sys.exit(1)
+  pp.pprint(client.finishBlobUpload(args[0],))
+
+elif cmd == 'cancelBlobUpload':
+  if len(args) != 1:
+    print('cancelBlobUpload requires 1 args')
+    sys.exit(1)
+  pp.pprint(client.cancelBlobUpload(args[0],))
+
+elif cmd == 'getBlobMeta':
+  if len(args) != 1:
+    print('getBlobMeta requires 1 args')
+    sys.exit(1)
+  pp.pprint(client.getBlobMeta(args[0],))
+
+elif cmd == 'setBlobMeta':
+  if len(args) != 2:
+    print('setBlobMeta requires 2 args')
+    sys.exit(1)
+  pp.pprint(client.setBlobMeta(args[0],eval(args[1]),))
+
+elif cmd == 'beginBlobDownload':
+  if len(args) != 1:
+    print('beginBlobDownload requires 1 args')
+    sys.exit(1)
+  pp.pprint(client.beginBlobDownload(args[0],))
+
+elif cmd == 'downloadBlobChunk':
+  if len(args) != 1:
+    print('downloadBlobChunk requires 1 args')
+    sys.exit(1)
+  pp.pprint(client.downloadBlobChunk(args[0],))
+
+elif cmd == 'deleteBlob':
+  if len(args) != 1:
+    print('deleteBlob requires 1 args')
+    sys.exit(1)
+  pp.pprint(client.deleteBlob(args[0],))
+
+elif cmd == 'listBlobs':
+  if len(args) != 1:
+    print('listBlobs requires 1 args')
+    sys.exit(1)
+  pp.pprint(client.listBlobs(args[0],))
+
+elif cmd == 'getBlobReplication':
+  if len(args) != 1:
+    print('getBlobReplication requires 1 args')
+    sys.exit(1)
+  pp.pprint(client.getBlobReplication(args[0],))
+
+elif cmd == 'updateBlobReplication':
+  if len(args) != 2:
+    print('updateBlobReplication requires 2 args')
+    sys.exit(1)
+  pp.pprint(client.updateBlobReplication(args[0],eval(args[1]),))
+
+elif cmd == 'createStateInZookeeper':
+  if len(args) != 1:
+    print('createStateInZookeeper requires 1 args')
+    sys.exit(1)
+  pp.pprint(client.createStateInZookeeper(args[0],))
+
 elif cmd == 'beginFileUpload':
   if len(args) != 0:
     print('beginFileUpload requires 0 args')