You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ka...@apache.org on 2017/09/28 07:21:35 UTC

[1/9] storm git commit: STORM-2084: Refactor localization to combine files together

Repository: storm
Updated Branches:
  refs/heads/master b5be1d6b0 -> 5c05bb74c


http://git-wip-us.apache.org/repos/asf/storm/blob/78cb243c/storm-server/src/test/java/org/apache/storm/localizer/LocalizerTest.java
----------------------------------------------------------------------
diff --git a/storm-server/src/test/java/org/apache/storm/localizer/LocalizerTest.java b/storm-server/src/test/java/org/apache/storm/localizer/LocalizerTest.java
deleted file mode 100644
index 70e8e9d..0000000
--- a/storm-server/src/test/java/org/apache/storm/localizer/LocalizerTest.java
+++ /dev/null
@@ -1,682 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.localizer;
-
-import org.apache.storm.Config;
-import org.apache.storm.DaemonConfig;
-import org.apache.storm.blobstore.BlobStoreAclHandler;
-import org.apache.storm.blobstore.ClientBlobStore;
-import org.apache.storm.blobstore.InputStreamWithMeta;
-import org.apache.storm.blobstore.LocalFsBlobStore;
-import org.apache.storm.generated.AccessControl;
-import org.apache.storm.generated.AccessControlType;
-import org.apache.storm.generated.AuthorizationException;
-import org.apache.storm.generated.KeyNotFoundException;
-import org.apache.storm.generated.ReadableBlobMeta;
-import org.apache.storm.generated.SettableBlobMeta;
-import org.apache.storm.utils.ServerUtils;
-import org.apache.storm.utils.Utils;
-import org.apache.commons.io.FileUtils;
-import org.apache.commons.io.IOUtils;
-import com.google.common.base.Joiner;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-import org.mockito.Mockito;
-
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.nio.file.Files;
-import java.util.*;
-
-import static org.apache.storm.localizer.Localizer.USERCACHE;
-import static org.junit.Assert.*;
-import static org.mockito.Mockito.*;
-
-
-public class LocalizerTest {
-
-  private File baseDir;
-
-  private final String user1 = "user1";
-  private final String user2 = "user2";
-  private final String user3 = "user3";
-
-  private ClientBlobStore mockblobstore = mock(ClientBlobStore.class);
-
-
-  class TestLocalizer extends Localizer {
-
-    TestLocalizer(Map<String, Object> conf, String baseDir) {
-      super(conf, baseDir);
-    }
-
-    @Override
-    protected ClientBlobStore getClientBlobStore() {
-      return mockblobstore;
-    }
-  }
-
-  class TestInputStreamWithMeta extends InputStreamWithMeta {
-    private InputStream iostream;
-
-    public TestInputStreamWithMeta() {
-      iostream = IOUtils.toInputStream("some test data for my input stream");
-    }
-
-    public TestInputStreamWithMeta(InputStream istream) {
-       iostream = istream;
-    }
-
-    @Override
-    public long getVersion() throws IOException {
-      return 1;
-    }
-
-    @Override
-    public synchronized int read() {
-      return 0;
-    }
-
-    @Override
-    public synchronized int read(byte[] b)
-    throws IOException {
-      int length = iostream.read(b);
-      if (length == 0) {
-        return -1;
-      }
-      return length;
-    }
-
-    @Override
-    public long getFileLength() {
-        return 0;
-    }
-  };
-
-  @Before
-  public void setUp() throws Exception {
-    baseDir = new File(System.getProperty("java.io.tmpdir") + "/blob-store-localizer-test-"+ UUID.randomUUID());
-    if (!baseDir.mkdir()) {
-      throw new IOException("failed to create base directory");
-    }
-  }
-
-  @After
-  public void tearDown() throws Exception {
-    try {
-      FileUtils.deleteDirectory(baseDir);
-    } catch (IOException ignore) {}
-  }
-
-  protected String joinPath(String... pathList) {
-      return Joiner.on(File.separator).join(pathList);
-  }
-
-  public String constructUserCacheDir(String base, String user) {
-    return joinPath(base, USERCACHE, user);
-  }
-
-  public String constructExpectedFilesDir(String base, String user) {
-    return joinPath(constructUserCacheDir(base, user), Localizer.FILECACHE, Localizer.FILESDIR);
-  }
-
-  public String constructExpectedArchivesDir(String base, String user) {
-    return joinPath(constructUserCacheDir(base, user), Localizer.FILECACHE, Localizer.ARCHIVESDIR);
-  }
-
-  @Test
-  public void testDirPaths() throws Exception {
-    Map<String, Object> conf = new HashMap();
-    Localizer localizer = new TestLocalizer(conf, baseDir.toString());
-
-    String expectedDir = constructUserCacheDir(baseDir.toString(), user1);
-    assertEquals("get local user dir doesn't return right value",
-        expectedDir, localizer.getLocalUserDir(user1).toString());
-
-    String expectedFileDir = joinPath(expectedDir, Localizer.FILECACHE);
-    assertEquals("get local user file dir doesn't return right value",
-        expectedFileDir, localizer.getLocalUserFileCacheDir(user1).toString());
-  }
-
-  @Test
-  public void testReconstruct() throws Exception {
-    Map<String, Object> conf = new HashMap();
-
-    String expectedFileDir1 = constructExpectedFilesDir(baseDir.toString(), user1);
-    String expectedArchiveDir1 = constructExpectedArchivesDir(baseDir.toString(), user1);
-    String expectedFileDir2 = constructExpectedFilesDir(baseDir.toString(), user2);
-    String expectedArchiveDir2 = constructExpectedArchivesDir(baseDir.toString(), user2);
-
-    String key1 = "testfile1.txt";
-    String key2 = "testfile2.txt";
-    String key3 = "testfile3.txt";
-    String key4 = "testfile4.txt";
-
-    String archive1 = "archive1";
-    String archive2 = "archive2";
-
-    File user1file1 = new File(expectedFileDir1, key1 + ServerUtils.DEFAULT_CURRENT_BLOB_SUFFIX);
-    File user1file2 = new File(expectedFileDir1, key2 + ServerUtils.DEFAULT_CURRENT_BLOB_SUFFIX);
-    File user2file3 = new File(expectedFileDir2, key3 + ServerUtils.DEFAULT_CURRENT_BLOB_SUFFIX);
-    File user2file4 = new File(expectedFileDir2, key4 + ServerUtils.DEFAULT_CURRENT_BLOB_SUFFIX);
-
-    File user1archive1 = new File(expectedArchiveDir1, archive1 + ServerUtils.DEFAULT_CURRENT_BLOB_SUFFIX);
-    File user2archive2 = new File(expectedArchiveDir2, archive2 + ServerUtils.DEFAULT_CURRENT_BLOB_SUFFIX);
-    File user1archive1file = new File(user1archive1, "file1");
-    File user2archive2file = new File(user2archive2, "file2");
-
-    // setup some files/dirs to emulate supervisor restart
-    assertTrue("Failed setup filecache dir1", new File(expectedFileDir1).mkdirs());
-    assertTrue("Failed setup filecache dir2", new File(expectedFileDir2).mkdirs());
-    assertTrue("Failed setup file1", user1file1.createNewFile());
-    assertTrue("Failed setup file2", user1file2.createNewFile());
-    assertTrue("Failed setup file3", user2file3.createNewFile());
-    assertTrue("Failed setup file4", user2file4.createNewFile());
-    assertTrue("Failed setup archive dir1", user1archive1.mkdirs());
-    assertTrue("Failed setup archive dir2", user2archive2.mkdirs());
-    assertTrue("Failed setup file in archivedir1", user1archive1file.createNewFile());
-    assertTrue("Failed setup file in archivedir2", user2archive2file.createNewFile());
-
-    Localizer localizer = new TestLocalizer(conf, baseDir.toString());
-
-    ArrayList<LocalResource> arrUser1Keys = new ArrayList<LocalResource>();
-    arrUser1Keys.add(new LocalResource(key1, false));
-    arrUser1Keys.add(new LocalResource(archive1, true));
-    localizer.addReferences(arrUser1Keys, user1, "topo1");
-
-    LocalizedResourceSet lrsrcSet = localizer.getUserResources().get(user1);
-    assertEquals("local resource set size wrong", 3, lrsrcSet.getSize());
-    assertEquals("user doesn't match", user1, lrsrcSet.getUser());
-    LocalizedResource key1rsrc = lrsrcSet.get(key1, false);
-    assertNotNull("Local resource doesn't exist but should", key1rsrc);
-    assertEquals("key doesn't match", key1, key1rsrc.getKey());
-    assertEquals("refcount doesn't match", 1, key1rsrc.getRefCount());
-    LocalizedResource key2rsrc = lrsrcSet.get(key2, false);
-    assertNotNull("Local resource doesn't exist but should", key2rsrc);
-    assertEquals("key doesn't match", key2, key2rsrc.getKey());
-    assertEquals("refcount doesn't match", 0, key2rsrc.getRefCount());
-    LocalizedResource archive1rsrc = lrsrcSet.get(archive1, true);
-    assertNotNull("Local resource doesn't exist but should", archive1rsrc);
-    assertEquals("key doesn't match", archive1, archive1rsrc.getKey());
-    assertEquals("refcount doesn't match", 1, archive1rsrc.getRefCount());
-
-    LocalizedResourceSet lrsrcSet2 = localizer.getUserResources().get(user2);
-    assertEquals("local resource set size wrong", 3, lrsrcSet2.getSize());
-    assertEquals("user doesn't match", user2, lrsrcSet2.getUser());
-    LocalizedResource key3rsrc = lrsrcSet2.get(key3, false);
-    assertNotNull("Local resource doesn't exist but should", key3rsrc);
-    assertEquals("key doesn't match", key3, key3rsrc.getKey());
-    assertEquals("refcount doesn't match", 0, key3rsrc.getRefCount());
-    LocalizedResource key4rsrc = lrsrcSet2.get(key4, false);
-    assertNotNull("Local resource doesn't exist but should", key4rsrc);
-    assertEquals("key doesn't match", key4, key4rsrc.getKey());
-    assertEquals("refcount doesn't match", 0, key4rsrc.getRefCount());
-    LocalizedResource archive2rsrc = lrsrcSet2.get(archive2, true);
-    assertNotNull("Local resource doesn't exist but should", archive2rsrc);
-    assertEquals("key doesn't match", archive2, archive2rsrc.getKey());
-    assertEquals("refcount doesn't match", 0, archive2rsrc.getRefCount());
-  }
-
-  @Test
-  public void testArchivesTgz() throws Exception {
-    testArchives(getFileFromResource(joinPath("localizer", "localtestwithsymlink.tgz")), true, 21344);
-  }
-
-  @Test
-  public void testArchivesZip() throws Exception {
-    testArchives(getFileFromResource(joinPath("localizer", "localtest.zip")), false, 21348);
-  }
-
-  @Test
-  public void testArchivesTarGz() throws Exception {
-    testArchives(getFileFromResource(joinPath("localizer", "localtestwithsymlink.tar.gz")), true, 21344);
-  }
-
-  @Test
-  public void testArchivesTar() throws Exception {
-    testArchives(getFileFromResource(joinPath("localizer", "localtestwithsymlink.tar")), true, 21344);
-  }
-
-  @Test
-  public void testArchivesJar() throws Exception {
-    testArchives(getFileFromResource(joinPath("localizer", "localtestwithsymlink.jar")), false, 21416);
-  }
-
-  private File getFileFromResource(String archivePath) {
-    ClassLoader classLoader = getClass().getClassLoader();
-    return new File(classLoader.getResource(archivePath).getFile());
-  }
-
-  // archive passed in must contain symlink named tmptestsymlink if not a zip file
-  public void testArchives(File archiveFile, boolean supportSymlinks, int size) throws Exception {
-    if (Utils.isOnWindows()) {
-      // Windows should set this to false cause symlink in compressed file doesn't work properly.
-      supportSymlinks = false;
-    }
-
-    Map<String, Object> conf = new HashMap();
-    // set clean time really high so doesn't kick in
-    conf.put(DaemonConfig.SUPERVISOR_LOCALIZER_CACHE_CLEANUP_INTERVAL_MS, 60*60*1000);
-
-    String key1 = archiveFile.getName();
-    String topo1 = "topo1";
-    Localizer localizer = new TestLocalizer(conf, baseDir.toString());
-    // set really small so will do cleanup
-    localizer.setTargetCacheSize(1);
-
-    ReadableBlobMeta rbm = new ReadableBlobMeta();
-    rbm.set_settable(new SettableBlobMeta(BlobStoreAclHandler.WORLD_EVERYTHING));
-    when(mockblobstore.getBlobMeta(key1)).thenReturn(rbm);
-
-    when(mockblobstore.getBlob(key1)).thenReturn(new TestInputStreamWithMeta(new
-        FileInputStream(archiveFile.getAbsolutePath())));
-
-    long timeBefore = System.nanoTime();
-    File user1Dir = localizer.getLocalUserFileCacheDir(user1);
-    assertTrue("failed to create user dir", user1Dir.mkdirs());
-    LocalizedResource lrsrc = localizer.getBlob(new LocalResource(key1, true), user1, topo1,
-        user1Dir);
-    long timeAfter = System.nanoTime();
-
-    String expectedUserDir = joinPath(baseDir.toString(), USERCACHE, user1);
-    String expectedFileDir = joinPath(expectedUserDir, Localizer.FILECACHE, Localizer.ARCHIVESDIR);
-    assertTrue("user filecache dir not created", new File(expectedFileDir).exists());
-    File keyFile = new File(expectedFileDir, key1 + ".0");
-    assertTrue("blob not created", keyFile.exists());
-    assertTrue("blob is not uncompressed", keyFile.isDirectory());
-    File symlinkFile = new File(keyFile, "tmptestsymlink");
-
-    if (supportSymlinks) {
-      assertTrue("blob uncompressed doesn't contain symlink", Files.isSymbolicLink(
-          symlinkFile.toPath()));
-    } else {
-      assertTrue("blob symlink file doesn't exist", symlinkFile.exists());
-    }
-
-    LocalizedResourceSet lrsrcSet = localizer.getUserResources().get(user1);
-    assertEquals("local resource set size wrong", 1, lrsrcSet.getSize());
-    assertEquals("user doesn't match", user1, lrsrcSet.getUser());
-    LocalizedResource key1rsrc = lrsrcSet.get(key1, true);
-    assertNotNull("Local resource doesn't exist but should", key1rsrc);
-    assertEquals("key doesn't match", key1, key1rsrc.getKey());
-    assertEquals("refcount doesn't match", 1, key1rsrc.getRefCount());
-    assertEquals("file path doesn't match", keyFile.toString(), key1rsrc.getFilePathWithVersion());
-    assertEquals("size doesn't match", size, key1rsrc.getSize());
-    assertTrue("timestamp not within range", (key1rsrc.getLastAccessTime() >= timeBefore && key1rsrc
-        .getLastAccessTime() <= timeAfter));
-
-    timeBefore = System.nanoTime();
-    localizer.removeBlobReference(lrsrc.getKey(), user1, topo1, true);
-    timeAfter = System.nanoTime();
-
-    lrsrcSet = localizer.getUserResources().get(user1);
-    assertEquals("local resource set size wrong", 1, lrsrcSet.getSize());
-    key1rsrc = lrsrcSet.get(key1, true);
-    assertNotNull("Local resource doesn't exist but should", key1rsrc);
-    assertEquals("refcount doesn't match", 0, key1rsrc.getRefCount());
-    assertTrue("timestamp not within range", (key1rsrc.getLastAccessTime() >= timeBefore && key1rsrc
-        .getLastAccessTime() <= timeAfter));
-
-    // should remove the blob since cache size set really small
-    localizer.handleCacheCleanup();
-
-    lrsrcSet = localizer.getUserResources().get(user1);
-    assertFalse("blob contents not deleted", symlinkFile.exists());
-    assertFalse("blob not deleted", keyFile.exists());
-    assertFalse("blob file dir not deleted", new File(expectedFileDir).exists());
-    assertFalse("blob dir not deleted", new File(expectedUserDir).exists());
-    assertNull("user set should be null", lrsrcSet);
-
-  }
-
-  @Test
-  public void testBasic() throws Exception {
-    Map<String, Object> conf = new HashMap();
-    // set clean time really high so doesn't kick in
-    conf.put(DaemonConfig.SUPERVISOR_LOCALIZER_CACHE_CLEANUP_INTERVAL_MS, 60*60*1000);
-
-    String key1 = "key1";
-    String topo1 = "topo1";
-    Localizer localizer = new TestLocalizer(conf, baseDir.toString());
-    // set really small so will do cleanup
-    localizer.setTargetCacheSize(1);
-
-    ReadableBlobMeta rbm = new ReadableBlobMeta();
-    rbm.set_settable(new SettableBlobMeta(BlobStoreAclHandler.WORLD_EVERYTHING));
-    when(mockblobstore.getBlobMeta(key1)).thenReturn(rbm);
-
-    when(mockblobstore.getBlob(key1)).thenReturn(new TestInputStreamWithMeta());
-
-    long timeBefore = System.nanoTime();
-    File user1Dir = localizer.getLocalUserFileCacheDir(user1);
-    assertTrue("failed to create user dir", user1Dir.mkdirs());
-    LocalizedResource lrsrc = localizer.getBlob(new LocalResource(key1, false), user1, topo1,
-        user1Dir);
-    long timeAfter = System.nanoTime();
-
-    String expectedUserDir = joinPath(baseDir.toString(), USERCACHE, user1);
-    String expectedFileDir = joinPath(expectedUserDir, Localizer.FILECACHE, Localizer.FILESDIR);
-    assertTrue("user filecache dir not created", new File(expectedFileDir).exists());
-    File keyFile = new File(expectedFileDir, key1);
-    File keyFileCurrentSymlink = new File(expectedFileDir, key1 + ServerUtils.DEFAULT_CURRENT_BLOB_SUFFIX);
-
-    assertTrue("blob not created", keyFileCurrentSymlink.exists());
-
-    LocalizedResourceSet lrsrcSet = localizer.getUserResources().get(user1);
-    assertEquals("local resource set size wrong", 1, lrsrcSet.getSize());
-    assertEquals("user doesn't match", user1, lrsrcSet.getUser());
-    LocalizedResource key1rsrc = lrsrcSet.get(key1, false);
-    assertNotNull("Local resource doesn't exist but should", key1rsrc);
-    assertEquals("key doesn't match", key1, key1rsrc.getKey());
-    assertEquals("refcount doesn't match", 1, key1rsrc.getRefCount());
-    assertEquals("file path doesn't match", keyFile.toString(), key1rsrc.getFilePath());
-    assertEquals("size doesn't match", 34, key1rsrc.getSize());
-    assertTrue("timestamp not within range", (key1rsrc.getLastAccessTime() >= timeBefore && key1rsrc
-        .getLastAccessTime() <= timeAfter));
-
-    timeBefore = System.nanoTime();
-    localizer.removeBlobReference(lrsrc.getKey(), user1, topo1, false);
-    timeAfter = System.nanoTime();
-
-    lrsrcSet = localizer.getUserResources().get(user1);
-    assertEquals("local resource set size wrong", 1, lrsrcSet.getSize());
-    key1rsrc = lrsrcSet.get(key1, false);
-    assertNotNull("Local resource doesn't exist but should", key1rsrc);
-    assertEquals("refcount doesn't match", 0, key1rsrc.getRefCount());
-    assertTrue("timestamp not within range", (key1rsrc.getLastAccessTime() >= timeBefore && key1rsrc
-        .getLastAccessTime() <= timeAfter));
-
-    // should remove the blob since cache size set really small
-    localizer.handleCacheCleanup();
-
-    lrsrcSet = localizer.getUserResources().get(user1);
-    assertNull("user set should be null", lrsrcSet);
-    assertFalse("blob not deleted", keyFile.exists());
-    assertFalse("blob dir not deleted", new File(expectedFileDir).exists());
-    assertFalse("blob dir not deleted", new File(expectedUserDir).exists());
-  }
-
-  @Test
-  public void testMultipleKeysOneUser() throws Exception {
-    Map<String, Object> conf = new HashMap();
-    // set clean time really high so doesn't kick in
-    conf.put(DaemonConfig.SUPERVISOR_LOCALIZER_CACHE_CLEANUP_INTERVAL_MS, 60*60*1000);
-
-    String key1 = "key1";
-    String topo1 = "topo1";
-    String key2 = "key2";
-    String key3 = "key3";
-    Localizer localizer = new TestLocalizer(conf, baseDir.toString());
-    // set to keep 2 blobs (each of size 34)
-    localizer.setTargetCacheSize(68);
-
-    ReadableBlobMeta rbm = new ReadableBlobMeta();
-    rbm.set_settable(new SettableBlobMeta(BlobStoreAclHandler.WORLD_EVERYTHING));
-    when(mockblobstore.getBlobMeta(anyString())).thenReturn(rbm);
-    when(mockblobstore.getBlob(key1)).thenReturn(new TestInputStreamWithMeta());
-    when(mockblobstore.getBlob(key2)).thenReturn(new TestInputStreamWithMeta());
-    when(mockblobstore.getBlob(key3)).thenReturn(new TestInputStreamWithMeta());
-
-    List<LocalResource> keys = Arrays.asList(new LocalResource[]{new LocalResource(key1, false),
-        new LocalResource(key2, false), new LocalResource(key3, false)});
-    File user1Dir = localizer.getLocalUserFileCacheDir(user1);
-    assertTrue("failed to create user dir", user1Dir.mkdirs());
-
-    List<LocalizedResource> lrsrcs = localizer.getBlobs(keys, user1, topo1, user1Dir);
-    LocalizedResource lrsrc = lrsrcs.get(0);
-    LocalizedResource lrsrc2 = lrsrcs.get(1);
-    LocalizedResource lrsrc3 = lrsrcs.get(2);
-
-    String expectedFileDir = joinPath(baseDir.toString(), USERCACHE, user1,
-        Localizer.FILECACHE, Localizer.FILESDIR);
-    assertTrue("user filecache dir not created", new File(expectedFileDir).exists());
-    File keyFile = new File(expectedFileDir, key1 + ServerUtils.DEFAULT_CURRENT_BLOB_SUFFIX);
-    File keyFile2 = new File(expectedFileDir, key2 + ServerUtils.DEFAULT_CURRENT_BLOB_SUFFIX);
-    File keyFile3 = new File(expectedFileDir, key3 + ServerUtils.DEFAULT_CURRENT_BLOB_SUFFIX);
-
-    assertTrue("blob not created", keyFile.exists());
-    assertTrue("blob not created", keyFile2.exists());
-    assertTrue("blob not created", keyFile3.exists());
-    assertEquals("size doesn't match", 34, keyFile.length());
-    assertEquals("size doesn't match", 34, keyFile2.length());
-    assertEquals("size doesn't match", 34, keyFile3.length());
-    assertEquals("size doesn't match", 34, lrsrc.getSize());
-    assertEquals("size doesn't match", 34, lrsrc3.getSize());
-    assertEquals("size doesn't match", 34, lrsrc2.getSize());
-
-    LocalizedResourceSet lrsrcSet = localizer.getUserResources().get(user1);
-    assertEquals("local resource set size wrong", 3, lrsrcSet.getSize());
-    assertEquals("user doesn't match", user1, lrsrcSet.getUser());
-
-    long timeBefore = System.nanoTime();
-    localizer.removeBlobReference(lrsrc.getKey(), user1, topo1, false);
-    localizer.removeBlobReference(lrsrc2.getKey(), user1, topo1, false);
-    localizer.removeBlobReference(lrsrc3.getKey(), user1, topo1, false);
-    long timeAfter = System.nanoTime();
-
-    // add reference to one and then remove reference again so it has newer timestamp
-    lrsrc = localizer.getBlob(new LocalResource(key1, false), user1, topo1, user1Dir);
-    assertTrue("timestamp not within range", (lrsrc.getLastAccessTime() >= timeBefore && lrsrc
-        .getLastAccessTime() <= timeAfter));
-    localizer.removeBlobReference(lrsrc.getKey(), user1, topo1, false);
-
-    // should remove the second blob first
-    localizer.handleCacheCleanup();
-
-    lrsrcSet = localizer.getUserResources().get(user1);
-    assertEquals("local resource set size wrong", 2, lrsrcSet.getSize());
-    long end = System.currentTimeMillis() + 100;
-    while ((end - System.currentTimeMillis()) >= 0 && keyFile2.exists()) {
-      Thread.sleep(1);
-    }
-    assertFalse("blob not deleted", keyFile2.exists());
-    assertTrue("blob deleted", keyFile.exists());
-    assertTrue("blob deleted", keyFile3.exists());
-
-    // set size to cleanup another one
-    localizer.setTargetCacheSize(34);
-
-    // should remove the third blob
-    localizer.handleCacheCleanup();
-
-    lrsrcSet = localizer.getUserResources().get(user1);
-    assertEquals("local resource set size wrong", 1, lrsrcSet.getSize());
-    assertTrue("blob deleted", keyFile.exists());
-    assertFalse("blob not deleted", keyFile3.exists());
-  }
-
-  @Test(expected = AuthorizationException.class)
-  public void testFailAcls() throws Exception {
-    Map<String, Object> conf = new HashMap();
-    // set clean time really high so doesn't kick in
-    conf.put(DaemonConfig.SUPERVISOR_LOCALIZER_CACHE_CLEANUP_INTERVAL_MS, 60 * 60 * 1000);
-    // enable blobstore acl validation
-    conf.put(Config.STORM_BLOBSTORE_ACL_VALIDATION_ENABLED, true);
-
-    String topo1 = "topo1";
-    String key1 = "key1";
-    Localizer localizer = new TestLocalizer(conf, baseDir.toString());
-
-    ReadableBlobMeta rbm = new ReadableBlobMeta();
-    // set acl so user doesn't have read access
-    AccessControl acl = new AccessControl(AccessControlType.USER, BlobStoreAclHandler.ADMIN);
-    acl.set_name(user1);
-    rbm.set_settable(new SettableBlobMeta(Arrays.asList(acl)));
-    when(mockblobstore.getBlobMeta(anyString())).thenReturn(rbm);
-    when(mockblobstore.getBlob(key1)).thenReturn(new TestInputStreamWithMeta());
-    File user1Dir = localizer.getLocalUserFileCacheDir(user1);
-    assertTrue("failed to create user dir", user1Dir.mkdirs());
-
-    // This should throw AuthorizationException because auth failed
-    localizer.getBlob(new LocalResource(key1, false), user1, topo1, user1Dir);
-  }
-
-  @Test(expected = KeyNotFoundException.class)
-  public void testKeyNotFoundException() throws Exception {
-    Map<String, Object> conf = Utils.readStormConfig();
-    String key1 = "key1";
-    conf.put(Config.STORM_LOCAL_DIR, "target");
-    LocalFsBlobStore bs = new LocalFsBlobStore();
-    LocalFsBlobStore spy = spy(bs);
-    Mockito.doReturn(true).when(spy).checkForBlobOrDownload(key1);
-    Mockito.doNothing().when(spy).checkForBlobUpdate(key1);
-    spy.prepare(conf,null,null);
-    spy.getBlob(key1, null);
-  }
-
-    @Test
-  public void testMultipleUsers() throws Exception {
-    Map<String, Object> conf = new HashMap();
-    // set clean time really high so doesn't kick in
-    conf.put(DaemonConfig.SUPERVISOR_LOCALIZER_CACHE_CLEANUP_INTERVAL_MS, 60*60*1000);
-
-    String topo1 = "topo1";
-    String topo2 = "topo2";
-    String topo3 = "topo3";
-    String key1 = "key1";
-    String key2 = "key2";
-    String key3 = "key3";
-    Localizer localizer = new TestLocalizer(conf, baseDir.toString());
-    // set to keep 2 blobs (each of size 34)
-    localizer.setTargetCacheSize(68);
-
-    ReadableBlobMeta rbm = new ReadableBlobMeta();
-    rbm.set_settable(new SettableBlobMeta(BlobStoreAclHandler.WORLD_EVERYTHING));
-    when(mockblobstore.getBlobMeta(anyString())).thenReturn(rbm);
-    when(mockblobstore.getBlob(key1)).thenReturn(new TestInputStreamWithMeta());
-    when(mockblobstore.getBlob(key2)).thenReturn(new TestInputStreamWithMeta());
-    when(mockblobstore.getBlob(key3)).thenReturn(new TestInputStreamWithMeta());
-
-    File user1Dir = localizer.getLocalUserFileCacheDir(user1);
-    assertTrue("failed to create user dir", user1Dir.mkdirs());
-    File user2Dir = localizer.getLocalUserFileCacheDir(user2);
-    assertTrue("failed to create user dir", user2Dir.mkdirs());
-    File user3Dir = localizer.getLocalUserFileCacheDir(user3);
-    assertTrue("failed to create user dir", user3Dir.mkdirs());
-
-    LocalizedResource lrsrc = localizer.getBlob(new LocalResource(key1, false), user1, topo1,
-        user1Dir);
-    LocalizedResource lrsrc2 = localizer.getBlob(new LocalResource(key2, false), user2, topo2,
-        user2Dir);
-    LocalizedResource lrsrc3 = localizer.getBlob(new LocalResource(key3, false), user3, topo3,
-        user3Dir);
-    // make sure we support different user reading same blob
-    LocalizedResource lrsrc1_user3 = localizer.getBlob(new LocalResource(key1, false), user3,
-        topo3, user3Dir);
-
-    String expectedUserDir1 = joinPath(baseDir.toString(), USERCACHE, user1);
-    String expectedFileDirUser1 = joinPath(expectedUserDir1, Localizer.FILECACHE, Localizer.FILESDIR);
-    String expectedFileDirUser2 = joinPath(baseDir.toString(), USERCACHE, user2,
-        Localizer.FILECACHE, Localizer.FILESDIR);
-    String expectedFileDirUser3 = joinPath(baseDir.toString(), USERCACHE, user3,
-        Localizer.FILECACHE, Localizer.FILESDIR);
-    assertTrue("user filecache dir user1 not created", new File(expectedFileDirUser1).exists());
-    assertTrue("user filecache dir user2 not created", new File(expectedFileDirUser2).exists());
-    assertTrue("user filecache dir user3 not created", new File(expectedFileDirUser3).exists());
-
-    File keyFile = new File(expectedFileDirUser1, key1 + ServerUtils.DEFAULT_CURRENT_BLOB_SUFFIX);
-    File keyFile2 = new File(expectedFileDirUser2, key2 + ServerUtils.DEFAULT_CURRENT_BLOB_SUFFIX);
-    File keyFile3 = new File(expectedFileDirUser3, key3 + ServerUtils.DEFAULT_CURRENT_BLOB_SUFFIX);
-    File keyFile1user3 = new File(expectedFileDirUser3, key1 + ServerUtils.DEFAULT_CURRENT_BLOB_SUFFIX);
-
-    assertTrue("blob not created", keyFile.exists());
-    assertTrue("blob not created", keyFile2.exists());
-    assertTrue("blob not created", keyFile3.exists());
-    assertTrue("blob not created", keyFile1user3.exists());
-
-    LocalizedResourceSet lrsrcSet = localizer.getUserResources().get(user1);
-    assertEquals("local resource set size wrong", 1, lrsrcSet.getSize());
-    LocalizedResourceSet lrsrcSet2 = localizer.getUserResources().get(user2);
-    assertEquals("local resource set size wrong", 1, lrsrcSet2.getSize());
-    LocalizedResourceSet lrsrcSet3 = localizer.getUserResources().get(user3);
-    assertEquals("local resource set size wrong", 2, lrsrcSet3.getSize());
-
-    localizer.removeBlobReference(lrsrc.getKey(), user1, topo1, false);
-    // should remove key1
-    localizer.handleCacheCleanup();
-
-    lrsrcSet = localizer.getUserResources().get(user1);
-    lrsrcSet3 = localizer.getUserResources().get(user3);
-    assertNull("user set should be null", lrsrcSet);
-    assertFalse("blob dir not deleted", new File(expectedFileDirUser1).exists());
-    assertFalse("blob dir not deleted", new File(expectedUserDir1).exists());
-    assertEquals("local resource set size wrong", 2, lrsrcSet3.getSize());
-
-    assertTrue("blob deleted", keyFile2.exists());
-    assertFalse("blob not deleted", keyFile.exists());
-    assertTrue("blob deleted", keyFile3.exists());
-    assertTrue("blob deleted", keyFile1user3.exists());
-  }
-
-  @Test
-  public void testUpdate() throws Exception {
-    Map<String, Object> conf = new HashMap();
-    // set clean time really high so doesn't kick in
-    conf.put(DaemonConfig.SUPERVISOR_LOCALIZER_CACHE_CLEANUP_INTERVAL_MS, 60*60*1000);
-
-    String key1 = "key1";
-    String topo1 = "topo1";
-    String topo2 = "topo2";
-    Localizer localizer = new TestLocalizer(conf, baseDir.toString());
-
-    ReadableBlobMeta rbm = new ReadableBlobMeta();
-    rbm.set_version(1);
-    rbm.set_settable(new SettableBlobMeta(BlobStoreAclHandler.WORLD_EVERYTHING));
-    when(mockblobstore.getBlobMeta(key1)).thenReturn(rbm);
-    when(mockblobstore.getBlob(key1)).thenReturn(new TestInputStreamWithMeta());
-
-    File user1Dir = localizer.getLocalUserFileCacheDir(user1);
-    assertTrue("failed to create user dir", user1Dir.mkdirs());
-    LocalizedResource lrsrc = localizer.getBlob(new LocalResource(key1, false), user1, topo1,
-        user1Dir);
-
-    String expectedUserDir = joinPath(baseDir.toString(), USERCACHE, user1);
-    String expectedFileDir = joinPath(expectedUserDir, Localizer.FILECACHE, Localizer.FILESDIR);
-    assertTrue("user filecache dir not created", new File(expectedFileDir).exists());
-    File keyFile = new File(expectedFileDir, key1);
-    File keyFileCurrentSymlink = new File(expectedFileDir, key1 + ServerUtils.DEFAULT_CURRENT_BLOB_SUFFIX);
-    assertTrue("blob not created", keyFileCurrentSymlink.exists());
-    File versionFile = new File(expectedFileDir, key1 + ServerUtils.DEFAULT_BLOB_VERSION_SUFFIX);
-    assertTrue("blob version file not created", versionFile.exists());
-    assertEquals("blob version not correct", 1, ServerUtils.localVersionOfBlob(keyFile.toString()));
-
-    LocalizedResourceSet lrsrcSet = localizer.getUserResources().get(user1);
-    assertEquals("local resource set size wrong", 1, lrsrcSet.getSize());
-
-    // test another topology getting blob with updated version - it should update version now
-    rbm.set_version(2);
-
-    localizer.getBlob(new LocalResource(key1, false), user1, topo2, user1Dir);
-    assertTrue("blob version file not created", versionFile.exists());
-    assertEquals("blob version not correct", 2, ServerUtils.localVersionOfBlob(keyFile.toString()));
-    assertTrue("blob file with version 2 not created", new File(keyFile + ".2").exists());
-
-    // now test regular updateBlob
-    rbm.set_version(3);
-
-    ArrayList<LocalResource> arr = new ArrayList<LocalResource>();
-    arr.add(new LocalResource(key1, false));
-    localizer.updateBlobs(arr, user1);
-    assertTrue("blob version file not created", versionFile.exists());
-    assertEquals("blob version not correct", 3, ServerUtils.localVersionOfBlob(keyFile.toString()));
-    assertTrue("blob file with version 3 not created", new File(keyFile + ".3").exists());
-  }
-}


[6/9] storm git commit: [MINOR] fix typos in LogviewerServer

Posted by ka...@apache.org.
[MINOR] fix typos in LogviewerServer

Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/b1f98c54
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/b1f98c54
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/b1f98c54

Branch: refs/heads/master
Commit: b1f98c5442cb402cc701514fa80c213135117273
Parents: db510ae
Author: Meng Li (Ethan) <et...@gmail.com>
Authored: Tue Sep 26 14:30:55 2017 -0500
Committer: GitHub <no...@github.com>
Committed: Tue Sep 26 14:30:55 2017 -0500

----------------------------------------------------------------------
 .../java/org/apache/storm/daemon/logviewer/LogviewerServer.java    | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/b1f98c54/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/LogviewerServer.java
----------------------------------------------------------------------
diff --git a/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/LogviewerServer.java b/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/LogviewerServer.java
index 0802015..87e7925 100644
--- a/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/LogviewerServer.java
+++ b/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/LogviewerServer.java
@@ -54,7 +54,7 @@ import org.slf4j.LoggerFactory;
  */
 public class LogviewerServer implements AutoCloseable {
     private static final Logger LOG = LoggerFactory.getLogger(LogviewerServer.class);
-    private static final Meter meterShutdownCalls = StormMetricsRegistry.registerMeter("drpc:num-shutdown-calls");
+    private static final Meter meterShutdownCalls = StormMetricsRegistry.registerMeter("logviewer:num-shutdown-calls");
     public static final String STATIC_RESOURCE_DIRECTORY_PATH = "./public";
 
     private static Server mkHttpServer(Map<String, Object> conf) {


[8/9] storm git commit: Merge branch 'STORM-2758' of https://github.com/Ethanlm/storm into STORM-2758-merge

Posted by ka...@apache.org.
Merge branch 'STORM-2758' of https://github.com/Ethanlm/storm into STORM-2758-merge

* fixed merge conflict by Jungtaek Lim <ka...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/99969a5b
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/99969a5b
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/99969a5b

Branch: refs/heads/master
Commit: 99969a5bf259c3fd0dc3ee12cc5380ed87896d18
Parents: ddea2ac ec77f66
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Thu Sep 28 16:19:48 2017 +0900
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Thu Sep 28 16:19:48 2017 +0900

----------------------------------------------------------------------
 .../java/org/apache/storm/daemon/logviewer/LogviewerServer.java   | 3 ++-
 1 file changed, 2 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/99969a5b/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/LogviewerServer.java
----------------------------------------------------------------------
diff --cc storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/LogviewerServer.java
index 87e7925,5bceb21..77f939f
--- a/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/LogviewerServer.java
+++ b/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/LogviewerServer.java
@@@ -54,8 -54,9 +54,9 @@@ import org.slf4j.LoggerFactory
   */
  public class LogviewerServer implements AutoCloseable {
      private static final Logger LOG = LoggerFactory.getLogger(LogviewerServer.class);
 -    private static final Meter meterShutdownCalls = StormMetricsRegistry.registerMeter("drpc:num-shutdown-calls");
 +    private static final Meter meterShutdownCalls = StormMetricsRegistry.registerMeter("logviewer:num-shutdown-calls");
-     public static final String STATIC_RESOURCE_DIRECTORY_PATH = "./public";
+     private static final String stormHome = System.getProperty("storm.home");
+     public static final String STATIC_RESOURCE_DIRECTORY_PATH = stormHome + "/public";
  
      private static Server mkHttpServer(Map<String, Object> conf) {
          Integer logviewerHttpPort = (Integer) conf.get(DaemonConfig.LOGVIEWER_PORT);


[9/9] storm git commit: Merge branch 'STORM-2084' of https://github.com/revans2/incubator-storm into STORM-2084-merge

Posted by ka...@apache.org.
Merge branch 'STORM-2084' of https://github.com/revans2/incubator-storm into STORM-2084-merge


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/5c05bb74
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/5c05bb74
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/5c05bb74

Branch: refs/heads/master
Commit: 5c05bb74cbef88dfd63ab212183c0494b9fcc985
Parents: 99969a5 78cb243
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Thu Sep 28 16:20:32 2017 +0900
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Thu Sep 28 16:20:32 2017 +0900

----------------------------------------------------------------------
 conf/defaults.yaml                              |    2 +-
 .../src/jvm/org/apache/storm/StormTimer.java    |    2 +-
 .../org/apache/storm/testing4j_test.clj         |   44 +-
 .../daemon/supervisor/ReadClusterState.java     |    4 +-
 .../apache/storm/daemon/supervisor/Slot.java    |   24 +-
 .../storm/daemon/supervisor/Supervisor.java     |   53 +-
 .../daemon/supervisor/SupervisorUtils.java      |   33 -
 .../daemon/supervisor/timer/UpdateBlobs.java    |  111 --
 .../org/apache/storm/event/EventManagerImp.java |    2 +-
 .../apache/storm/localizer/AsyncLocalizer.java  | 1030 +++++++++++++++---
 .../org/apache/storm/localizer/ILocalizer.java  |   70 --
 .../localizer/LocalDownloadedResource.java      |   42 +-
 .../org/apache/storm/localizer/Localizer.java   |  695 ------------
 .../org/apache/storm/utils/ServerUtils.java     |   18 +-
 .../storm/daemon/supervisor/SlotTest.java       |   24 +-
 .../storm/localizer/AsyncLocalizerTest.java     |  699 +++++++++++-
 .../apache/storm/localizer/LocalizerTest.java   |  682 ------------
 17 files changed, 1627 insertions(+), 1908 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/5c05bb74/conf/defaults.yaml
----------------------------------------------------------------------


[4/9] storm git commit: STORM-2084: Refactor localization to combine files together

Posted by ka...@apache.org.
STORM-2084: Refactor localization to combine files together


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/78cb243c
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/78cb243c
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/78cb243c

Branch: refs/heads/master
Commit: 78cb243c4bc9aaeaffd6f1c76915ac20016b32e7
Parents: 66ff5fd
Author: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Authored: Fri Sep 15 13:29:40 2017 -0500
Committer: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Committed: Thu Sep 21 15:59:24 2017 -0500

----------------------------------------------------------------------
 conf/defaults.yaml                              |    2 +-
 .../src/jvm/org/apache/storm/StormTimer.java    |    2 +-
 .../org/apache/storm/testing4j_test.clj         |   44 +-
 .../daemon/supervisor/ReadClusterState.java     |    4 +-
 .../apache/storm/daemon/supervisor/Slot.java    |   24 +-
 .../storm/daemon/supervisor/Supervisor.java     |   53 +-
 .../daemon/supervisor/SupervisorUtils.java      |   33 -
 .../daemon/supervisor/timer/UpdateBlobs.java    |  111 --
 .../org/apache/storm/event/EventManagerImp.java |    2 +-
 .../apache/storm/localizer/AsyncLocalizer.java  | 1030 +++++++++++++++---
 .../org/apache/storm/localizer/ILocalizer.java  |   70 --
 .../localizer/LocalDownloadedResource.java      |   42 +-
 .../org/apache/storm/localizer/Localizer.java   |  695 ------------
 .../org/apache/storm/utils/ServerUtils.java     |   18 +-
 .../storm/daemon/supervisor/SlotTest.java       |   24 +-
 .../storm/localizer/AsyncLocalizerTest.java     |  699 +++++++++++-
 .../apache/storm/localizer/LocalizerTest.java   |  682 ------------
 17 files changed, 1627 insertions(+), 1908 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/78cb243c/conf/defaults.yaml
----------------------------------------------------------------------
diff --git a/conf/defaults.yaml b/conf/defaults.yaml
index c6ef390..679a74b 100644
--- a/conf/defaults.yaml
+++ b/conf/defaults.yaml
@@ -123,7 +123,7 @@ supervisor.blobstore.class: "org.apache.storm.blobstore.NimbusBlobStore"
 supervisor.blobstore.download.thread.count: 5
 supervisor.blobstore.download.max_retries: 3
 supervisor.localizer.cache.target.size.mb: 10240
-supervisor.localizer.cleanup.interval.ms: 600000
+supervisor.localizer.cleanup.interval.ms: 30000
 
 nimbus.blobstore.class: "org.apache.storm.blobstore.LocalFsBlobStore"
 nimbus.blobstore.expiration.secs: 600

http://git-wip-us.apache.org/repos/asf/storm/blob/78cb243c/storm-client/src/jvm/org/apache/storm/StormTimer.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/StormTimer.java b/storm-client/src/jvm/org/apache/storm/StormTimer.java
index 4f6a7d5..b2e2b4a 100644
--- a/storm-client/src/jvm/org/apache/storm/StormTimer.java
+++ b/storm-client/src/jvm/org/apache/storm/StormTimer.java
@@ -225,7 +225,7 @@ public class StormTimer implements AutoCloseable {
      */
 
     @Override
-    public void close() throws Exception {
+    public void close() throws InterruptedException {
         if (this.task.isActive()) {
             this.task.setActive(false);
             this.task.interrupt();

http://git-wip-us.apache.org/repos/asf/storm/blob/78cb243c/storm-core/test/clj/integration/org/apache/storm/testing4j_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/integration/org/apache/storm/testing4j_test.clj b/storm-core/test/clj/integration/org/apache/storm/testing4j_test.clj
index 87e1fc0..1b12928 100644
--- a/storm-core/test/clj/integration/org/apache/storm/testing4j_test.clj
+++ b/storm-core/test/clj/integration/org/apache/storm/testing4j_test.clj
@@ -65,13 +65,13 @@
   (reify TestJob
        (^void run [this ^ILocalCluster cluster]
          (let [topology (Thrift/buildTopology
-                         {"1" (Thrift/prepareSpoutDetails (TestWordSpout. true) (Integer. 3))}
+                         {"spout" (Thrift/prepareSpoutDetails (TestWordSpout. true) (Integer. 3))}
                          {"2" (Thrift/prepareBoltDetails
-                                {(GlobalStreamId. "1" Utils/DEFAULT_STREAM_ID)
+                                {(GlobalStreamId. "spout" Utils/DEFAULT_STREAM_ID)
                                  (Thrift/prepareFieldsGrouping ["word"])}
                                 (TestWordCounter.) (Integer. 4))
                           "3" (Thrift/prepareBoltDetails
-                                {(GlobalStreamId. "1" Utils/DEFAULT_STREAM_ID)
+                                {(GlobalStreamId. "spout" Utils/DEFAULT_STREAM_ID)
                                  (Thrift/prepareGlobalGrouping)}
                                 (TestGlobalCount.))
                           "4" (Thrift/prepareBoltDetails
@@ -79,7 +79,7 @@
                                  (Thrift/prepareGlobalGrouping)}
                                 (TestAggregatesCounter.))})
                mocked-sources (doto (MockedSources.)
-                                (.addMockData "1" (into-array Values [(Values. (into-array ["nathan"]))
+                                (.addMockData "spout" (into-array Values [(Values. (into-array ["nathan"]))
                                                                       (Values. (into-array ["bob"]))
                                                                       (Values. (into-array ["joey"]))
                                                                       (Values. (into-array ["nathan"]))])
@@ -93,7 +93,7 @@
                                                  topology
                                                  complete-topology-param)]
            (is (Testing/multiseteq [["nathan"] ["bob"] ["joey"] ["nathan"]]
-                           (Testing/readTuples results "1")))
+                           (Testing/readTuples results "spout")))
            (is (Testing/multiseteq [["nathan" (int 1)] ["nathan" (int 2)] ["bob" (int 1)] ["joey" (int 1)]]
                            (Testing/readTuples results "2")))
            (is (= [[1] [2] [3] [4]]
@@ -102,18 +102,36 @@
                   (Testing/readTuples results "4")))
            ))))
 
-(deftest test-complete-topology
-  (doseq [zmq-on? [true false]
-          :let [daemon-conf (doto (Config.)
-                              (.put STORM-LOCAL-MODE-ZMQ zmq-on?))
-                mk-cluster-param (doto (MkClusterParam.)
-                                   (.setSupervisors (int 4))
-                                   (.setDaemonConf daemon-conf))]]
+(deftest test-complete-topology-netty-simulated
+  (let [daemon-conf (doto (Config.)
+                            (.put STORM-LOCAL-MODE-ZMQ true))
+        mk-cluster-param (doto (MkClusterParam.)
+                                 (.setSupervisors (int 4))
+                                 (.setDaemonConf daemon-conf))]
     (Testing/withSimulatedTimeLocalCluster
-      mk-cluster-param complete-topology-testjob )
+      mk-cluster-param complete-topology-testjob)))
+
+(deftest test-complete-topology-netty
+  (let [daemon-conf (doto (Config.)
+                            (.put STORM-LOCAL-MODE-ZMQ true))
+        mk-cluster-param (doto (MkClusterParam.)
+                                 (.setSupervisors (int 4))
+                                 (.setDaemonConf daemon-conf))]
     (Testing/withLocalCluster
       mk-cluster-param complete-topology-testjob)))
 
+(deftest test-complete-topology-local
+  (let [mk-cluster-param (doto (MkClusterParam.)
+                                 (.setSupervisors (int 4)))]
+    (Testing/withLocalCluster
+      mk-cluster-param complete-topology-testjob)))
+
+(deftest test-complete-topology-local-simulated
+  (let [mk-cluster-param (doto (MkClusterParam.)
+                                 (.setSupervisors (int 4)))]
+    (Testing/withSimulatedTimeLocalCluster
+      mk-cluster-param complete-topology-testjob)))
+
 (deftest test-with-tracked-cluster
   (Testing/withTrackedCluster
    (reify TestJob

http://git-wip-us.apache.org/repos/asf/storm/blob/78cb243c/storm-server/src/main/java/org/apache/storm/daemon/supervisor/ReadClusterState.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/ReadClusterState.java b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/ReadClusterState.java
index e346a09..d68e512 100644
--- a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/ReadClusterState.java
+++ b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/ReadClusterState.java
@@ -41,7 +41,7 @@ import org.apache.storm.generated.LocalAssignment;
 import org.apache.storm.generated.NodeInfo;
 import org.apache.storm.generated.ProfileRequest;
 import org.apache.storm.generated.WorkerResources;
-import org.apache.storm.localizer.ILocalizer;
+import org.apache.storm.localizer.AsyncLocalizer;
 import org.apache.storm.scheduler.ISupervisor;
 import org.apache.storm.utils.LocalState;
 import org.apache.storm.utils.Time;
@@ -60,7 +60,7 @@ public class ReadClusterState implements Runnable, AutoCloseable {
     private final AtomicInteger readRetry = new AtomicInteger(0);
     private final String assignmentId;
     private final ISupervisor iSuper;
-    private final ILocalizer localizer;
+    private final AsyncLocalizer localizer;
     private final ContainerLauncher launcher;
     private final String host;
     private final LocalState localState;

http://git-wip-us.apache.org/repos/asf/storm/blob/78cb243c/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Slot.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Slot.java b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Slot.java
index d221b71..6533d15 100644
--- a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Slot.java
+++ b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Slot.java
@@ -43,7 +43,7 @@ import org.apache.storm.generated.LSWorkerHeartbeat;
 import org.apache.storm.generated.LocalAssignment;
 import org.apache.storm.generated.ProfileAction;
 import org.apache.storm.generated.ProfileRequest;
-import org.apache.storm.localizer.ILocalizer;
+import org.apache.storm.localizer.AsyncLocalizer;
 import org.apache.storm.metric.StormMetricsRegistry;
 import org.apache.storm.scheduler.ISupervisor;
 import org.apache.storm.utils.LocalState;
@@ -79,7 +79,7 @@ public class Slot extends Thread implements AutoCloseable {
     }
     
     static class StaticState {
-        public final ILocalizer localizer;
+        public final AsyncLocalizer localizer;
         public final long hbTimeoutMs;
         public final long firstHbTimeoutMs;
         public final long killSleepMs;
@@ -90,10 +90,10 @@ public class Slot extends Thread implements AutoCloseable {
         public final ISupervisor iSupervisor;
         public final LocalState localState;
         
-        StaticState(ILocalizer localizer, long hbTimeoutMs, long firstHbTimeoutMs,
-                long killSleepMs, long monitorFreqMs,
-                ContainerLauncher containerLauncher, String host, int port,
-                ISupervisor iSupervisor, LocalState localState) {
+        StaticState(AsyncLocalizer localizer, long hbTimeoutMs, long firstHbTimeoutMs,
+                    long killSleepMs, long monitorFreqMs,
+                    ContainerLauncher containerLauncher, String host, int port,
+                    ISupervisor iSupervisor, LocalState localState) {
             this.localizer = localizer;
             this.hbTimeoutMs = hbTimeoutMs;
             this.firstHbTimeoutMs = firstHbTimeoutMs;
@@ -684,12 +684,12 @@ public class Slot extends Thread implements AutoCloseable {
     private volatile DynamicState dynamicState;
     private final AtomicReference<Map<Long, LocalAssignment>> cachedCurrentAssignments;
     
-    public Slot(ILocalizer localizer, Map<String, Object> conf, 
-            ContainerLauncher containerLauncher, String host,
-            int port, LocalState localState,
-            IStormClusterState clusterState,
-            ISupervisor iSupervisor,
-            AtomicReference<Map<Long, LocalAssignment>> cachedCurrentAssignments) throws Exception {
+    public Slot(AsyncLocalizer localizer, Map<String, Object> conf,
+                ContainerLauncher containerLauncher, String host,
+                int port, LocalState localState,
+                IStormClusterState clusterState,
+                ISupervisor iSupervisor,
+                AtomicReference<Map<Long, LocalAssignment>> cachedCurrentAssignments) throws Exception {
         super("SLOT_"+port);
 
         this.cachedCurrentAssignments = cachedCurrentAssignments;

http://git-wip-us.apache.org/repos/asf/storm/blob/78cb243c/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Supervisor.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Supervisor.java b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Supervisor.java
index 1f8d4c3..08d32f1 100644
--- a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Supervisor.java
+++ b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Supervisor.java
@@ -15,6 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.storm.daemon.supervisor;
 
 import java.io.File;
@@ -25,7 +26,6 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 import java.util.concurrent.Callable;
 import java.util.concurrent.atomic.AtomicReference;
 
@@ -39,19 +39,15 @@ import org.apache.storm.cluster.IStormClusterState;
 import org.apache.storm.daemon.DaemonCommon;
 import org.apache.storm.daemon.supervisor.timer.SupervisorHealthCheck;
 import org.apache.storm.daemon.supervisor.timer.SupervisorHeartbeat;
-import org.apache.storm.daemon.supervisor.timer.UpdateBlobs;
 import org.apache.storm.event.EventManager;
 import org.apache.storm.event.EventManagerImp;
 import org.apache.storm.generated.LocalAssignment;
 import org.apache.storm.localizer.AsyncLocalizer;
-import org.apache.storm.localizer.ILocalizer;
-import org.apache.storm.localizer.Localizer;
 import org.apache.storm.messaging.IContext;
 import org.apache.storm.metric.StormMetricsRegistry;
 import org.apache.storm.scheduler.ISupervisor;
 import org.apache.storm.utils.ConfigUtils;
 import org.apache.storm.utils.ServerConfigUtils;
-import org.apache.storm.utils.ServerUtils;
 import org.apache.storm.utils.Utils;
 import org.apache.storm.utils.ObjectReader;
 import org.apache.storm.utils.LocalState;
@@ -78,8 +74,6 @@ public class Supervisor implements DaemonCommon, AutoCloseable {
     private final AtomicReference<Map<Long, LocalAssignment>> currAssignment;
     private final StormTimer heartbeatTimer;
     private final StormTimer eventTimer;
-    private final StormTimer blobUpdateTimer;
-    private final Localizer localizer;
     private final AsyncLocalizer asyncLocalizer;
     private EventManager eventManager;
     private ReadClusterState readState;
@@ -110,10 +104,11 @@ public class Supervisor implements DaemonCommon, AutoCloseable {
             throw Utils.wrapInRuntime(e);
         }
 
+        this.currAssignment = new AtomicReference<>(new HashMap<>());
+
         try {
             this.localState = ServerConfigUtils.supervisorState(conf);
-            this.localizer = ServerUtils.createLocalizer(conf, ConfigUtils.supervisorLocalDir(conf));
-            this.asyncLocalizer = new AsyncLocalizer(conf, this.localizer);
+            this.asyncLocalizer = new AsyncLocalizer(conf, currAssignment, localState.getLocalAssignmentsMap());
         } catch (IOException e) {
             throw Utils.wrapInRuntime(e);
         }
@@ -126,13 +121,9 @@ public class Supervisor implements DaemonCommon, AutoCloseable {
             throw Utils.wrapInRuntime(e);
         }
 
-        this.currAssignment = new AtomicReference<Map<Long, LocalAssignment>>(new HashMap<Long,LocalAssignment>());
-
         this.heartbeatTimer = new StormTimer(null, new DefaultUncaughtExceptionHandler());
 
         this.eventTimer = new StormTimer(null, new DefaultUncaughtExceptionHandler());
-
-        this.blobUpdateTimer = new StormTimer("blob-update-timer", new DefaultUncaughtExceptionHandler());
     }
     
     public String getId() {
@@ -178,12 +169,8 @@ public class Supervisor implements DaemonCommon, AutoCloseable {
     public AtomicReference<Map<Long, LocalAssignment>> getCurrAssignment() {
         return currAssignment;
     }
-
-    public Localizer getLocalizer() {
-        return localizer;
-    }
     
-    ILocalizer getAsyncLocalizer() {
+    AsyncLocalizer getAsyncLocalizer() {
         return asyncLocalizer;
     }
     
@@ -199,8 +186,6 @@ public class Supervisor implements DaemonCommon, AutoCloseable {
         String path = ServerConfigUtils.supervisorTmpDir(conf);
         FileUtils.cleanDirectory(new File(path));
 
-        Localizer localizer = getLocalizer();
-
         SupervisorHeartbeat hb = new SupervisorHeartbeat(conf, this);
         hb.run();
         // should synchronize supervisor so it doesn't launch anything after being down (optimization)
@@ -209,36 +194,14 @@ public class Supervisor implements DaemonCommon, AutoCloseable {
 
         this.eventManager = new EventManagerImp(false);
         this.readState = new ReadClusterState(this);
-        
-        Set<String> downloadedTopoIds = SupervisorUtils.readDownloadedTopologyIds(conf);
-        Map<Integer, LocalAssignment> portToAssignments = localState.getLocalAssignmentsMap();
-        if (portToAssignments != null) {
-            Map<String, LocalAssignment> assignments = new HashMap<>();
-            for (LocalAssignment la : localState.getLocalAssignmentsMap().values()) {
-                assignments.put(la.get_topology_id(), la);
-            }
-            for (String topoId : downloadedTopoIds) {
-                LocalAssignment la = assignments.get(topoId);
-                if (la != null) {
-                    SupervisorUtils.addBlobReferences(localizer, topoId, conf, la.get_owner());
-                } else {
-                    LOG.warn("Could not find an owner for topo {}", topoId);
-                }
-            }
-        }
-        // do this after adding the references so we don't try to clean things being used
-        localizer.startCleaner();
 
-        UpdateBlobs updateBlobsThread = new UpdateBlobs(this);
+        asyncLocalizer.start();
 
         if ((Boolean) conf.get(DaemonConfig.SUPERVISOR_ENABLE)) {
             // This isn't strictly necessary, but it doesn't hurt and ensures that the machine stays up
             // to date even if callbacks don't all work exactly right
             eventTimer.scheduleRecurring(0, 10, new EventManagerPushCallback(readState, eventManager));
 
-            // Blob update thread. Starts with 30 seconds delay, every 30 seconds
-            blobUpdateTimer.scheduleRecurring(30, 30, new EventManagerPushCallback(updateBlobsThread, eventManager));
-
             // supervisor health check
             eventTimer.scheduleRecurring(300, 300, new SupervisorHealthCheck(this));
         }
@@ -282,15 +245,13 @@ public class Supervisor implements DaemonCommon, AutoCloseable {
             this.active = false;
             heartbeatTimer.close();
             eventTimer.close();
-            blobUpdateTimer.close();
             if (eventManager != null) {
                 eventManager.close();
             }
             if (readState != null) {
                 readState.close();
             }
-            asyncLocalizer.shutdown();
-            localizer.shutdown();
+            asyncLocalizer.close();
             getStormClusterState().disconnect();
         } catch (Exception e) {
             LOG.error("Error Shutting down", e);

http://git-wip-us.apache.org/repos/asf/storm/blob/78cb243c/storm-server/src/main/java/org/apache/storm/daemon/supervisor/SupervisorUtils.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/SupervisorUtils.java b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/SupervisorUtils.java
index 09c2b5d..33574c3 100644
--- a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/SupervisorUtils.java
+++ b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/SupervisorUtils.java
@@ -19,9 +19,7 @@ package org.apache.storm.daemon.supervisor;
 
 import org.apache.storm.Config;
 import org.apache.storm.generated.LSWorkerHeartbeat;
-import org.apache.storm.generated.LocalAssignment;
 import org.apache.storm.localizer.LocalResource;
-import org.apache.storm.localizer.Localizer;
 import org.apache.storm.utils.ConfigUtils;
 import org.apache.storm.utils.ServerUtils;
 import org.apache.storm.utils.Utils;
@@ -33,14 +31,11 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
-import java.net.URLDecoder;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 
 public class SupervisorUtils {
 
@@ -95,34 +90,6 @@ public class SupervisorUtils {
         return localResourceList;
     }
 
-    /**
-     * For each of the downloaded topologies, adds references to the blobs that the topologies are using. This is used to reconstruct the
-     * cache on restart.
-     * 
-     * @param localizer
-     * @param stormId
-     * @param conf
-     */
-    static void addBlobReferences(Localizer localizer, String stormId, Map<String, Object> conf, String user) throws IOException {
-        Map<String, Object> topoConf = ConfigUtils.readSupervisorStormConf(conf, stormId);
-        Map<String, Map<String, Object>> blobstoreMap = (Map<String, Map<String, Object>>) topoConf.get(Config.TOPOLOGY_BLOBSTORE_MAP);
-        String topoName = (String) topoConf.get(Config.TOPOLOGY_NAME);
-        List<LocalResource> localresources = SupervisorUtils.blobstoreMapToLocalresources(blobstoreMap);
-        if (blobstoreMap != null) {
-            localizer.addReferences(localresources, user, topoName);
-        }
-    }
-
-    public static Set<String> readDownloadedTopologyIds(Map<String, Object> conf) throws IOException {
-        Set<String> stormIds = new HashSet<>();
-        String path = ConfigUtils.supervisorStormDistRoot(conf);
-        Collection<String> rets = ConfigUtils.readDirContents(path);
-        for (String ret : rets) {
-            stormIds.add(URLDecoder.decode(ret));
-        }
-        return stormIds;
-    }
-
     public static Collection<String> supervisorWorkerIds(Map<String, Object> conf) {
         String workerRoot = ConfigUtils.workerRoot(conf);
         return ConfigUtils.readDirContents(workerRoot);

http://git-wip-us.apache.org/repos/asf/storm/blob/78cb243c/storm-server/src/main/java/org/apache/storm/daemon/supervisor/timer/UpdateBlobs.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/timer/UpdateBlobs.java b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/timer/UpdateBlobs.java
deleted file mode 100644
index b5dbf57..0000000
--- a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/timer/UpdateBlobs.java
+++ /dev/null
@@ -1,111 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.daemon.supervisor.timer;
-
-import java.util.HashMap;
-import org.apache.storm.Config;
-import org.apache.storm.daemon.supervisor.Supervisor;
-import org.apache.storm.daemon.supervisor.SupervisorUtils;
-import org.apache.storm.generated.AuthorizationException;
-import org.apache.storm.generated.KeyNotFoundException;
-import org.apache.storm.generated.LocalAssignment;
-import org.apache.storm.localizer.LocalResource;
-import org.apache.storm.localizer.Localizer;
-import org.apache.storm.utils.ConfigUtils;
-import org.apache.storm.utils.Utils;
-import org.apache.storm.utils.NimbusLeaderNotFoundException;
-import org.apache.thrift.transport.TTransportException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.atomic.AtomicReference;
-
-/**
- * downloads all blobs listed in the topology configuration for all topologies assigned to this supervisor, and creates version files with a suffix. The
- * Runnable is intended to be run periodically by a timer, created elsewhere.
- */
-public class UpdateBlobs implements Runnable {
-
-    private static final Logger LOG = LoggerFactory.getLogger(UpdateBlobs.class);
-
-    private Supervisor supervisor;
-
-    public UpdateBlobs(Supervisor supervisor) {
-        this.supervisor = supervisor;
-    }
-
-    @Override
-    public void run() {
-        try {
-            Map<String, Object> conf = supervisor.getConf();
-            Set<String> downloadedStormIds = SupervisorUtils.readDownloadedTopologyIds(conf);
-            AtomicReference<Map<Long, LocalAssignment>> newAssignment = supervisor.getCurrAssignment();
-            Map<String, LocalAssignment> assignedStormIds = new HashMap<>();
-            for (LocalAssignment localAssignment : newAssignment.get().values()) {
-                assignedStormIds.put(localAssignment.get_topology_id(), localAssignment);
-            }
-            for (String stormId : downloadedStormIds) {
-                LocalAssignment la = assignedStormIds.get(stormId);
-                if (la != null) {
-                    if (la.get_owner() == null) {
-                        //We got a case where the local assignment is not up to date, no point in going on...
-                        LOG.warn("The blobs will not be updated for {} until the local assignment is updated...", stormId);
-                    } else {
-                        String stormRoot = ConfigUtils.supervisorStormDistRoot(conf, stormId);
-                        LOG.debug("Checking Blob updates for storm topology id {} With target_dir: {}", stormId, stormRoot);
-                        updateBlobsForTopology(conf, stormId, supervisor.getLocalizer(), la.get_owner());
-                    }
-                }
-            }
-        } catch (Exception e) {
-            if (Utils.exceptionCauseIsInstanceOf(TTransportException.class, e)) {
-                LOG.error("Network error while updating blobs, will retry again later", e);
-            } else if (Utils.exceptionCauseIsInstanceOf(NimbusLeaderNotFoundException.class, e)) {
-                LOG.error("Nimbus unavailable to update blobs, will retry again later", e);
-            } else {
-                throw Utils.wrapInRuntime(e);
-            }
-        }
-    }
-
-    /**
-     * Update each blob listed in the topology configuration if the latest version of the blob has not been downloaded.
-     * 
-     * @param conf
-     * @param stormId
-     * @param localizer
-     * @throws IOException
-     */
-    private void updateBlobsForTopology(Map<String, Object> conf, String stormId, Localizer localizer, String user) throws IOException {
-        Map<String, Object> topoConf = ConfigUtils.readSupervisorStormConf(conf, stormId);
-        Map<String, Map<String, Object>> blobstoreMap = (Map<String, Map<String, Object>>) topoConf.get(Config.TOPOLOGY_BLOBSTORE_MAP);
-        List<LocalResource> localresources = SupervisorUtils.blobstoreMapToLocalresources(blobstoreMap);
-        try {
-            localizer.updateBlobs(localresources, user);
-        } catch (AuthorizationException authExp) {
-            LOG.error("AuthorizationException error", authExp);
-        } catch (KeyNotFoundException knf) {
-            LOG.error("KeyNotFoundException error", knf);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/78cb243c/storm-server/src/main/java/org/apache/storm/event/EventManagerImp.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/event/EventManagerImp.java b/storm-server/src/main/java/org/apache/storm/event/EventManagerImp.java
index 0a64370..6b9d4f1 100644
--- a/storm-server/src/main/java/org/apache/storm/event/EventManagerImp.java
+++ b/storm-server/src/main/java/org/apache/storm/event/EventManagerImp.java
@@ -73,7 +73,7 @@ public class EventManagerImp implements EventManager {
         runner.start();
     }
 
-    public void proccessInc() {
+    private void proccessInc() {
         processed.incrementAndGet();
     }
 


[5/9] storm git commit: [STORM-2758] fix: logviewer_search page not found

Posted by ka...@apache.org.
[STORM-2758] fix: logviewer_search page not found


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/ec77f66c
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/ec77f66c
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/ec77f66c

Branch: refs/heads/master
Commit: ec77f66c124e1bdb499b9b80a9778aaa2fa9f9af
Parents: db510ae
Author: Ethan Li <et...@gmail.com>
Authored: Tue Sep 26 13:32:58 2017 -0500
Committer: Ethan Li <et...@gmail.com>
Committed: Tue Sep 26 13:42:57 2017 -0500

----------------------------------------------------------------------
 .../java/org/apache/storm/daemon/logviewer/LogviewerServer.java   | 3 ++-
 1 file changed, 2 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/ec77f66c/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/LogviewerServer.java
----------------------------------------------------------------------
diff --git a/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/LogviewerServer.java b/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/LogviewerServer.java
index 0802015..5bceb21 100644
--- a/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/LogviewerServer.java
+++ b/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/LogviewerServer.java
@@ -55,7 +55,8 @@ import org.slf4j.LoggerFactory;
 public class LogviewerServer implements AutoCloseable {
     private static final Logger LOG = LoggerFactory.getLogger(LogviewerServer.class);
     private static final Meter meterShutdownCalls = StormMetricsRegistry.registerMeter("drpc:num-shutdown-calls");
-    public static final String STATIC_RESOURCE_DIRECTORY_PATH = "./public";
+    private static final String stormHome = System.getProperty("storm.home");
+    public static final String STATIC_RESOURCE_DIRECTORY_PATH = stormHome + "/public";
 
     private static Server mkHttpServer(Map<String, Object> conf) {
         Integer logviewerHttpPort = (Integer) conf.get(DaemonConfig.LOGVIEWER_PORT);


[3/9] storm git commit: STORM-2084: Refactor localization to combine files together

Posted by ka...@apache.org.
http://git-wip-us.apache.org/repos/asf/storm/blob/78cb243c/storm-server/src/main/java/org/apache/storm/localizer/AsyncLocalizer.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/localizer/AsyncLocalizer.java b/storm-server/src/main/java/org/apache/storm/localizer/AsyncLocalizer.java
index 204ab7a..913820c 100644
--- a/storm-server/src/main/java/org/apache/storm/localizer/AsyncLocalizer.java
+++ b/storm-server/src/main/java/org/apache/storm/localizer/AsyncLocalizer.java
@@ -8,121 +8,293 @@
  * with the License.  You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
- *  
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.storm.localizer;
 
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import java.io.BufferedWriter;
 import java.io.File;
+import java.io.FileOutputStream;
+import java.io.FileWriter;
+import java.io.FilenameFilter;
 import java.io.IOException;
 import java.io.OutputStream;
+import java.io.PrintWriter;
 import java.net.JarURLConnection;
 import java.net.URL;
 import java.net.URLDecoder;
 import java.nio.file.Files;
 import java.nio.file.Path;
+import java.nio.file.Paths;
 import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutorService;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
-
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+import org.apache.commons.io.FileUtils;
 import org.apache.storm.Config;
-import org.apache.storm.utils.ServerConfigUtils;
+import org.apache.storm.DaemonConfig;
 import org.apache.storm.blobstore.BlobStore;
 import org.apache.storm.blobstore.ClientBlobStore;
-import org.apache.storm.daemon.Shutdownable;
+import org.apache.storm.blobstore.InputStreamWithMeta;
 import org.apache.storm.daemon.supervisor.AdvancedFSOps;
 import org.apache.storm.daemon.supervisor.SupervisorUtils;
+import org.apache.storm.generated.AuthorizationException;
+import org.apache.storm.generated.KeyNotFoundException;
 import org.apache.storm.generated.LocalAssignment;
 import org.apache.storm.generated.StormTopology;
+import org.apache.storm.streams.Pair;
 import org.apache.storm.utils.ConfigUtils;
+import org.apache.storm.utils.NimbusLeaderNotFoundException;
+import org.apache.storm.utils.ObjectReader;
+import org.apache.storm.utils.ServerConfigUtils;
 import org.apache.storm.utils.ServerUtils;
+import org.apache.storm.utils.ShellUtils;
 import org.apache.storm.utils.Utils;
+import org.apache.thrift.transport.TTransportException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import static java.nio.file.StandardCopyOption.ATOMIC_MOVE;
 
 /**
- * This is a wrapper around the Localizer class that provides the desired
- * async interface to Slot.
+ * Downloads and caches blobs locally.
  */
-public class AsyncLocalizer implements ILocalizer, Shutdownable {
-    /**
-     * A future that has already completed.
-     */
-    private static class AllDoneFuture implements Future<Void> {
+public class AsyncLocalizer implements AutoCloseable {
+    private static final Logger LOG = LoggerFactory.getLogger(AsyncLocalizer.class);
+    public static final String FILECACHE = "filecache";
+    public static final String USERCACHE = "usercache";
+    // sub directories to store either files or uncompressed archives respectively
+    public static final String FILESDIR = "files";
+    public static final String ARCHIVESDIR = "archives";
 
-        @Override
-        public boolean cancel(boolean mayInterruptIfRunning) {
-            return false;
+    private static final String TO_UNCOMPRESS = "_tmp_";
+    private static final CompletableFuture<Void> ALL_DONE_FUTURE = new CompletableFuture<>();
+    static {
+        ALL_DONE_FUTURE.complete(null);
+    }
+
+    private static Set<String> readDownloadedTopologyIds(Map<String, Object> conf) throws IOException {
+        Set<String> stormIds = new HashSet<>();
+        String path = ConfigUtils.supervisorStormDistRoot(conf);
+        Collection<String> rets = ConfigUtils.readDirContents(path);
+        for (String ret : rets) {
+            stormIds.add(URLDecoder.decode(ret, "UTF-8"));
         }
+        return stormIds;
+    }
 
-        @Override
-        public boolean isCancelled() {
-            return false;
+    private final AtomicReference<Map<Long, LocalAssignment>> currAssignment;
+    private final boolean isLocalMode;
+    private final Map<String, LocalDownloadedResource> basicPending;
+    private final Map<String, LocalDownloadedResource> blobPending;
+    private final Map<String, Object> conf;
+    private final AdvancedFSOps fsOps;
+    private final boolean symlinksDisabled;
+
+    // track resources - user to resourceSet
+    private final ConcurrentMap<String, LocalizedResourceSet> userRsrc = new ConcurrentHashMap<>();
+
+    private final String localBaseDir;
+
+    private final int blobDownloadRetries;
+    private final ScheduledExecutorService execService;
+
+    // cleanup
+    private long cacheTargetSize;
+    private long cacheCleanupPeriod;
+
+
+    public AsyncLocalizer(Map<String, Object> conf, AtomicReference<Map<Long, LocalAssignment>> currAssignment,
+                          Map<Integer, LocalAssignment> portToAssignments) throws IOException {
+        this(conf, ConfigUtils.supervisorLocalDir(conf), AdvancedFSOps.make(conf), currAssignment, portToAssignments);
+    }
+    
+    @VisibleForTesting
+    AsyncLocalizer(Map<String, Object> conf, String baseDir, AdvancedFSOps ops,
+                   AtomicReference<Map<Long, LocalAssignment>> currAssignment,
+                   Map<Integer, LocalAssignment> portToAssignments) throws IOException {
+
+        this.conf = conf;
+        isLocalMode = ConfigUtils.isLocalMode(conf);
+        fsOps = ops;
+        localBaseDir = baseDir;
+        // default cache size 10GB, converted to Bytes
+        cacheTargetSize = ObjectReader.getInt(conf.get(DaemonConfig.SUPERVISOR_LOCALIZER_CACHE_TARGET_SIZE_MB),
+            10 * 1024).longValue() << 20;
+        // default 30 seconds.
+        cacheCleanupPeriod = ObjectReader.getInt(conf.get(
+            DaemonConfig.SUPERVISOR_LOCALIZER_CACHE_CLEANUP_INTERVAL_MS), 30 * 1000).longValue();
+
+        // if we needed we could make config for update thread pool size
+        int threadPoolSize = ObjectReader.getInt(conf.get(DaemonConfig.SUPERVISOR_BLOBSTORE_DOWNLOAD_THREAD_COUNT), 5);
+        blobDownloadRetries = ObjectReader.getInt(conf.get(
+            DaemonConfig.SUPERVISOR_BLOBSTORE_DOWNLOAD_MAX_RETRIES), 3);
+
+        execService = Executors.newScheduledThreadPool(threadPoolSize,
+            new ThreadFactoryBuilder().setNameFormat("AsyncLocalizer Executor").build());
+        reconstructLocalizedResources();
+
+        symlinksDisabled = (boolean)conf.getOrDefault(Config.DISABLE_SYMLINKS, false);
+        basicPending = new HashMap<>();
+        blobPending = new HashMap<>();
+        this.currAssignment = currAssignment;
+
+        recoverBlobReferences(portToAssignments);
+    }
+
+    /**
+     * For each of the downloaded topologies, adds references to the blobs that the topologies are using. This is used to reconstruct the
+     * cache on restart.
+     * @param topoId the topology id
+     * @param user the User that owns this topology
+     */
+    private void addBlobReferences(String topoId, String user) throws IOException {
+        Map<String, Object> topoConf = ConfigUtils.readSupervisorStormConf(conf, topoId);
+        @SuppressWarnings("unchecked")
+        Map<String, Map<String, Object>> blobstoreMap = (Map<String, Map<String, Object>>) topoConf.get(Config.TOPOLOGY_BLOBSTORE_MAP);
+        String topoName = (String) topoConf.get(Config.TOPOLOGY_NAME);
+        List<LocalResource> localresources = SupervisorUtils.blobstoreMapToLocalresources(blobstoreMap);
+        if (blobstoreMap != null) {
+            addReferences(localresources, user, topoName);
         }
+    }
 
-        @Override
-        public boolean isDone() {
-            return true;
+    /**
+     * Pick up where we left off last time.
+     * @param portToAssignments the current set of assignments for the supervisor.
+     */
+    private void recoverBlobReferences(Map<Integer, LocalAssignment> portToAssignments) throws IOException {
+        Set<String> downloadedTopoIds = readDownloadedTopologyIds(conf);
+        if (portToAssignments != null) {
+            Map<String, LocalAssignment> assignments = new HashMap<>();
+            for (LocalAssignment la : portToAssignments.values()) {
+                assignments.put(la.get_topology_id(), la);
+            }
+            for (String topoId : downloadedTopoIds) {
+                LocalAssignment la = assignments.get(topoId);
+                if (la != null) {
+                    addBlobReferences(topoId, la.get_owner());
+                } else {
+                    LOG.warn("Could not find an owner for topo {}", topoId);
+                }
+            }
         }
+    }
 
-        @Override
-        public Void get() {
-            return null;
+    /**
+     * Downloads all blobs listed in the topology configuration for all topologies assigned to this supervisor, and creates version files
+     * with a suffix. The runnable is intended to be run periodically by a timer, created elsewhere.
+     */
+    private void updateBlobs() {
+        try {
+            Map<String, String> topoIdToOwner = currAssignment.get().values().stream()
+                .map((la) -> Pair.of(la.get_topology_id(), la.get_owner()))
+                .distinct()
+                .collect(Collectors.toMap((p) -> p.getFirst(), (p) -> p.getSecond()));
+            for (String topoId : readDownloadedTopologyIds(conf)) {
+                String owner = topoIdToOwner.get(topoId);
+                if (owner == null) {
+                    //We got a case where the local assignment is not up to date, no point in going on...
+                    LOG.warn("The blobs will not be updated for {} until the local assignment is updated...", topoId);
+                } else {
+                    String stormRoot = ConfigUtils.supervisorStormDistRoot(conf, topoId);
+                    LOG.debug("Checking Blob updates for storm topology id {} With target_dir: {}", topoId, stormRoot);
+                    updateBlobsForTopology(conf, topoId, owner);
+                }
+            }
+        } catch (Exception e) {
+            if (Utils.exceptionCauseIsInstanceOf(TTransportException.class, e)) {
+                LOG.error("Network error while updating blobs, will retry again later", e);
+            } else if (Utils.exceptionCauseIsInstanceOf(NimbusLeaderNotFoundException.class, e)) {
+                LOG.error("Nimbus unavailable to update blobs, will retry again later", e);
+            } else {
+                throw Utils.wrapInRuntime(e);
+            }
         }
+    }
 
-        @Override
-        public Void get(long timeout, TimeUnit unit) {
-            return null;
+    /**
+     * Update each blob listed in the topology configuration if the latest version of the blob has not been downloaded.
+     */
+    private void updateBlobsForTopology(Map<String, Object> conf, String stormId, String user)
+        throws IOException {
+        Map<String, Object> topoConf = ConfigUtils.readSupervisorStormConf(conf, stormId);
+        Map<String, Map<String, Object>> blobstoreMap = (Map<String, Map<String, Object>>) topoConf.get(Config.TOPOLOGY_BLOBSTORE_MAP);
+        List<LocalResource> localresources = SupervisorUtils.blobstoreMapToLocalresources(blobstoreMap);
+        try {
+            updateBlobs(localresources, user);
+        } catch (AuthorizationException authExp) {
+            LOG.error("AuthorizationException error", authExp);
+        } catch (KeyNotFoundException knf) {
+            LOG.error("KeyNotFoundException error", knf);
         }
+    }
 
+    /**
+     * Start any background threads needed.  This includes updating blobs and cleaning up
+     * unused blobs over the configured size limit.
+     */
+    public void start() {
+        execService.scheduleWithFixedDelay(this::updateBlobs, 30, 30, TimeUnit.SECONDS);
+        execService.scheduleAtFixedRate(this::cleanup, cacheCleanupPeriod, cacheCleanupPeriod, TimeUnit.MILLISECONDS);
     }
 
-    private static final Logger LOG = LoggerFactory.getLogger(AsyncLocalizer.class);
+    @Override
+    public void close() throws InterruptedException {
+        if (execService != null) {
+            execService.shutdown();
+        }
+    }
 
-    private final Localizer _localizer;
-    private final ExecutorService _execService;
-    private final boolean _isLocalMode;
-    private final Map<String, Object> _conf;
-    private final Map<String, LocalDownloadedResource> _basicPending;
-    private final Map<String, LocalDownloadedResource> _blobPending;
-    private final AdvancedFSOps _fsOps;
-    private final boolean _symlinksDisabled;
-
-    private class DownloadBaseBlobsDistributed implements Callable<Void> {
-        protected final String _topologyId;
-        protected final File _stormRoot;
-        protected final LocalAssignment _assignment;
+    //ILocalizer
+    private class DownloadBaseBlobsDistributed implements Supplier<Void> {
+        protected final String topologyId;
+        protected final File stormRoot;
+        protected final LocalAssignment assignment;
         protected final String owner;
-        
+
         public DownloadBaseBlobsDistributed(String topologyId, LocalAssignment assignment) throws IOException {
-            _topologyId = topologyId;
-            _stormRoot = new File(ConfigUtils.supervisorStormDistRoot(_conf, _topologyId));
-            _assignment = assignment;
-	    owner = assignment.get_owner();
+            this.topologyId = topologyId;
+            stormRoot = new File(ConfigUtils.supervisorStormDistRoot(conf, this.topologyId));
+            this.assignment = assignment;
+            owner = assignment.get_owner();
         }
-        
+
         protected void downloadBaseBlobs(File tmproot) throws Exception {
-            String stormJarKey = ConfigUtils.masterStormJarKey(_topologyId);
-            String stormCodeKey = ConfigUtils.masterStormCodeKey(_topologyId);
-            String topoConfKey = ConfigUtils.masterStormConfKey(_topologyId);
+            String stormJarKey = ConfigUtils.masterStormJarKey(topologyId);
+            String stormCodeKey = ConfigUtils.masterStormCodeKey(topologyId);
+            String topoConfKey = ConfigUtils.masterStormConfKey(topologyId);
             String jarPath = ConfigUtils.supervisorStormJarPath(tmproot.getAbsolutePath());
             String codePath = ConfigUtils.supervisorStormCodePath(tmproot.getAbsolutePath());
             String confPath = ConfigUtils.supervisorStormConfPath(tmproot.getAbsolutePath());
-            _fsOps.forceMkdir(tmproot);
-            _fsOps.restrictDirectoryPermissions(tmproot);
-            ClientBlobStore blobStore = ServerUtils.getClientBlobStoreForSupervisor(_conf);
+            fsOps.forceMkdir(tmproot);
+            fsOps.restrictDirectoryPermissions(tmproot);
+            ClientBlobStore blobStore = ServerUtils.getClientBlobStoreForSupervisor(conf);
             try {
                 ServerUtils.downloadResourcesAsSupervisor(stormJarKey, jarPath, blobStore);
                 ServerUtils.downloadResourcesAsSupervisor(stormCodeKey, codePath, blobStore);
@@ -132,71 +304,71 @@ public class AsyncLocalizer implements ILocalizer, Shutdownable {
             }
             ServerUtils.extractDirFromJar(jarPath, ServerConfigUtils.RESOURCES_SUBDIR, tmproot);
         }
-        
+
         @Override
-        public Void call() throws Exception {
+        public Void get() {
             try {
-                if (_fsOps.fileExists(_stormRoot)) {
-                    if (!_fsOps.supportsAtomicDirectoryMove()) {
-                        LOG.warn("{} may have partially downloaded blobs, recovering", _topologyId);
-                        _fsOps.deleteIfExists(_stormRoot);
+                if (fsOps.fileExists(stormRoot)) {
+                    if (!fsOps.supportsAtomicDirectoryMove()) {
+                        LOG.warn("{} may have partially downloaded blobs, recovering", topologyId);
+                        fsOps.deleteIfExists(stormRoot);
                     } else {
-                        LOG.warn("{} already downloaded blobs, skipping", _topologyId);
+                        LOG.warn("{} already downloaded blobs, skipping", topologyId);
                         return null;
                     }
                 }
                 boolean deleteAll = true;
-                String tmproot = ServerConfigUtils.supervisorTmpDir(_conf) + Utils.FILE_PATH_SEPARATOR + Utils.uuid();
+                String tmproot = ServerConfigUtils.supervisorTmpDir(conf) + Utils.FILE_PATH_SEPARATOR + Utils.uuid();
                 File tr = new File(tmproot);
                 try {
                     downloadBaseBlobs(tr);
-                    if (_assignment.is_set_total_node_shared()) {
+                    if (assignment.is_set_total_node_shared()) {
                         File sharedMemoryDirTmpLocation = new File(tr, "shared_by_topology");
                         //We need to create a directory for shared memory to write to (we should not encourage this though)
                         Path path = sharedMemoryDirTmpLocation.toPath();
                         Files.createDirectories(path);
                     }
-                    _fsOps.moveDirectoryPreferAtomic(tr, _stormRoot);
-                    _fsOps.setupStormCodeDir(owner, _stormRoot);
-                    if (_assignment.is_set_total_node_shared()) {
-                        File sharedMemoryDir = new File(_stormRoot, "shared_by_topology");
-                        _fsOps.setupWorkerArtifactsDir(owner, sharedMemoryDir);
+                    fsOps.moveDirectoryPreferAtomic(tr, stormRoot);
+                    fsOps.setupStormCodeDir(owner, stormRoot);
+                    if (assignment.is_set_total_node_shared()) {
+                        File sharedMemoryDir = new File(stormRoot, "shared_by_topology");
+                        fsOps.setupWorkerArtifactsDir(owner, sharedMemoryDir);
                     }
                     deleteAll = false;
                 } finally {
                     if (deleteAll) {
-                        LOG.warn("Failed to download basic resources for topology-id {}", _topologyId);
-                        _fsOps.deleteIfExists(tr);
-                        _fsOps.deleteIfExists(_stormRoot);
+                        LOG.warn("Failed to download basic resources for topology-id {}", topologyId);
+                        fsOps.deleteIfExists(tr);
+                        fsOps.deleteIfExists(stormRoot);
                     }
                 }
                 return null;
             } catch (Exception e) {
                 LOG.warn("Caught Exception While Downloading (rethrowing)... ", e);
-                throw e;
+                throw new RuntimeException(e);
             }
         }
     }
-    
+
     private class DownloadBaseBlobsLocal extends DownloadBaseBlobsDistributed {
 
         public DownloadBaseBlobsLocal(String topologyId, LocalAssignment assignment) throws IOException {
             super(topologyId, assignment);
         }
-        
+
         @Override
         protected void downloadBaseBlobs(File tmproot) throws Exception {
-            _fsOps.forceMkdir(tmproot);
-            String stormCodeKey = ConfigUtils.masterStormCodeKey(_topologyId);
-            String topoConfKey = ConfigUtils.masterStormConfKey(_topologyId);
+            fsOps.forceMkdir(tmproot);
+            String stormCodeKey = ConfigUtils.masterStormCodeKey(topologyId);
+            String topoConfKey = ConfigUtils.masterStormConfKey(topologyId);
             File codePath = new File(ConfigUtils.supervisorStormCodePath(tmproot.getAbsolutePath()));
             File confPath = new File(ConfigUtils.supervisorStormConfPath(tmproot.getAbsolutePath()));
-            BlobStore blobStore = ServerUtils.getNimbusBlobStore(_conf, null);
+            BlobStore blobStore = ServerUtils.getNimbusBlobStore(conf, null);
             try {
-                try (OutputStream codeOutStream = _fsOps.getOutputStream(codePath)){
+                try (OutputStream codeOutStream = fsOps.getOutputStream(codePath)){
                     blobStore.readBlobTo(stormCodeKey, codeOutStream, null);
                 }
-                try (OutputStream confOutStream = _fsOps.getOutputStream(confPath)) {
+                try (OutputStream confOutStream = fsOps.getOutputStream(confPath)) {
                     blobStore.readBlobTo(topoConfKey, confOutStream, null);
                 }
             } finally {
@@ -204,7 +376,7 @@ public class AsyncLocalizer implements ILocalizer, Shutdownable {
             }
 
             ClassLoader classloader = Thread.currentThread().getContextClassLoader();
-            String resourcesJar = AsyncLocalizer.resourcesJar();
+            String resourcesJar = resourcesJar();
             URL url = classloader.getResource(ServerConfigUtils.RESOURCES_SUBDIR);
 
             String targetDir = tmproot + Utils.FILE_PATH_SEPARATOR;
@@ -217,26 +389,26 @@ public class AsyncLocalizer implements ILocalizer, Shutdownable {
                     JarURLConnection urlConnection = (JarURLConnection) url.openConnection();
                     ServerUtils.extractDirFromJar(urlConnection.getJarFileURL().getFile(), ServerConfigUtils.RESOURCES_SUBDIR, new File(targetDir));
                 } else {
-                    _fsOps.copyDirectory(new File(url.getFile()), new File(targetDir, ConfigUtils.RESOURCES_SUBDIR));
+                    fsOps.copyDirectory(new File(url.getFile()), new File(targetDir, ConfigUtils.RESOURCES_SUBDIR));
                 }
             }
         }
     }
-    
-    private class DownloadBlobs implements Callable<Void> {
-        private final String _topologyId;
+
+    private class DownloadBlobs implements Supplier<Void> {
+        private final String topologyId;
         private final String topoOwner;
 
         public DownloadBlobs(String topologyId, String topoOwner) {
-            _topologyId = topologyId;
+            this.topologyId = topologyId;
             this.topoOwner = topoOwner;
         }
 
         @Override
-        public Void call() throws Exception {
+        public Void get() {
             try {
-                String stormroot = ConfigUtils.supervisorStormDistRoot(_conf, _topologyId);
-                Map<String, Object> topoConf = ConfigUtils.readSupervisorStormConf(_conf, _topologyId);
+                String stormroot = ConfigUtils.supervisorStormDistRoot(conf, topologyId);
+                Map<String, Object> topoConf = ConfigUtils.readSupervisorStormConf(conf, topologyId);
 
                 @SuppressWarnings("unchecked")
                 Map<String, Map<String, Object>> blobstoreMap = (Map<String, Map<String, Object>>) topoConf.get(Config.TOPOLOGY_BLOBSTORE_MAP);
@@ -250,7 +422,7 @@ public class AsyncLocalizer implements ILocalizer, Shutdownable {
                     }
                 }
 
-                StormTopology stormCode = ConfigUtils.readSupervisorTopology(_conf, _topologyId, _fsOps);
+                StormTopology stormCode = ConfigUtils.readSupervisorTopology(conf, topologyId, fsOps);
                 List<String> dependencies = new ArrayList<>();
                 if (stormCode.is_set_dependency_jars()) {
                     dependencies.addAll(stormCode.get_dependency_jars());
@@ -263,13 +435,13 @@ public class AsyncLocalizer implements ILocalizer, Shutdownable {
                 }
 
                 if (!localResourceList.isEmpty()) {
-                    File userDir = _localizer.getLocalUserFileCacheDir(topoOwner);
-                    if (!_fsOps.fileExists(userDir)) {
-                        _fsOps.forceMkdir(userDir);
+                    File userDir = getLocalUserFileCacheDir(topoOwner);
+                    if (!fsOps.fileExists(userDir)) {
+                        fsOps.forceMkdir(userDir);
                     }
-                    List<LocalizedResource> localizedResources = _localizer.getBlobs(localResourceList, topoOwner, topoName, userDir);
-                    _fsOps.setupBlobPermissions(userDir, topoOwner);
-                    if (!_symlinksDisabled) {
+                    List<LocalizedResource> localizedResources = getBlobs(localResourceList, topoOwner, topoName, userDir);
+                    fsOps.setupBlobPermissions(userDir, topoOwner);
+                    if (!symlinksDisabled) {
                         for (LocalizedResource localizedResource : localizedResources) {
                             String keyName = localizedResource.getKey();
                             //The sym link we are pointing to
@@ -287,7 +459,7 @@ public class AsyncLocalizer implements ILocalizer, Shutdownable {
                                 // all things are from dependencies
                                 symlinkName = keyName;
                             }
-                            _fsOps.createSymlink(new File(stormroot, symlinkName), rsrcFilePath);
+                            fsOps.createSymlink(new File(stormroot, symlinkName), rsrcFilePath);
                         }
                     }
                 }
@@ -295,45 +467,25 @@ public class AsyncLocalizer implements ILocalizer, Shutdownable {
                 return null;
             } catch (Exception e) {
                 LOG.warn("Caught Exception While Downloading (rethrowing)... ", e);
-                throw e;
+                throw new RuntimeException(e);
             }
         }
     }
-    
-    //Visible for testing
-    AsyncLocalizer(Map<String, Object> conf, Localizer localizer, AdvancedFSOps ops) {
-        _conf = conf;
-        _symlinksDisabled = (boolean)conf.getOrDefault(Config.DISABLE_SYMLINKS, false);
-        _isLocalMode = ConfigUtils.isLocalMode(conf);
-        _localizer = localizer;
-        _execService = Executors.newFixedThreadPool(1,  
-                new ThreadFactoryBuilder()
-                .setNameFormat("Async Localizer")
-                .build());
-        _basicPending = new HashMap<>();
-        _blobPending = new HashMap<>();
-        _fsOps = ops;
-    }
-    
-    public AsyncLocalizer(Map<String, Object> conf, Localizer localizer) {
-        this(conf, localizer, AdvancedFSOps.make(conf));
-    }
 
-    @Override
-    public synchronized Future<Void> requestDownloadBaseTopologyBlobs(final LocalAssignment assignment, final int port) throws IOException {
+    public synchronized CompletableFuture<Void> requestDownloadBaseTopologyBlobs(final LocalAssignment assignment, final int port) throws IOException {
         final String topologyId = assignment.get_topology_id();
-        LocalDownloadedResource localResource = _basicPending.get(topologyId);
+        LocalDownloadedResource localResource = basicPending.get(topologyId);
         if (localResource == null) {
-            Callable<Void> c;
-            if (_isLocalMode) {
-                c = new DownloadBaseBlobsLocal(topologyId, assignment);
+            Supplier<Void> supplier;
+            if (isLocalMode) {
+                supplier = new DownloadBaseBlobsLocal(topologyId, assignment);
             } else {
-                c = new DownloadBaseBlobsDistributed(topologyId, assignment);
+                supplier = new DownloadBaseBlobsDistributed(topologyId, assignment);
             }
-            localResource = new LocalDownloadedResource(_execService.submit(c));
-            _basicPending.put(topologyId, localResource);
+            localResource = new LocalDownloadedResource(CompletableFuture.supplyAsync(supplier, execService));
+            basicPending.put(topologyId, localResource);
         }
-        Future<Void> ret = localResource.reserve(port, assignment);
+        CompletableFuture<Void> ret = localResource.reserve(port, assignment);
         LOG.debug("Reserved basic {} {}", topologyId, localResource);
         return ret;
     }
@@ -343,7 +495,7 @@ public class AsyncLocalizer implements ILocalizer, Shutdownable {
         if (path == null) {
             return null;
         }
-        
+
         for (String jpath : path.split(File.pathSeparator)) {
             if (jpath.endsWith(".jar")) {
                 if (ServerUtils.zipDoesContainDir(jpath, ServerConfigUtils.RESOURCES_SUBDIR)) {
@@ -353,63 +505,67 @@ public class AsyncLocalizer implements ILocalizer, Shutdownable {
         }
         return null;
     }
-    
-    @Override
+
     public synchronized void recoverRunningTopology(LocalAssignment assignment, int port) {
         final String topologyId = assignment.get_topology_id();
-        LocalDownloadedResource localResource = _basicPending.get(topologyId);
+        LocalDownloadedResource localResource = basicPending.get(topologyId);
         if (localResource == null) {
-            localResource = new LocalDownloadedResource(new AllDoneFuture());
-            _basicPending.put(topologyId, localResource);
+            localResource = new LocalDownloadedResource(ALL_DONE_FUTURE);
+            basicPending.put(topologyId, localResource);
         }
         localResource.reserve(port, assignment);
         LOG.debug("Recovered basic {} {}", topologyId, localResource);
-        
-        localResource = _blobPending.get(topologyId);
+
+        localResource = blobPending.get(topologyId);
         if (localResource == null) {
-            localResource = new LocalDownloadedResource(new AllDoneFuture());
-            _blobPending.put(topologyId, localResource);
+            localResource = new LocalDownloadedResource(ALL_DONE_FUTURE);
+            blobPending.put(topologyId, localResource);
         }
         localResource.reserve(port, assignment);
         LOG.debug("Recovered blobs {} {}", topologyId, localResource);
     }
-    
-    @Override
-    public synchronized Future<Void> requestDownloadTopologyBlobs(LocalAssignment assignment, int port) {
+
+    public synchronized CompletableFuture<Void> requestDownloadTopologyBlobs(LocalAssignment assignment, int port) {
         final String topologyId = assignment.get_topology_id();
-        LocalDownloadedResource localResource = _blobPending.get(topologyId);
+        LocalDownloadedResource localResource = blobPending.get(topologyId);
         if (localResource == null) {
-            Callable<Void> c = new DownloadBlobs(topologyId, assignment.get_owner());
-            localResource = new LocalDownloadedResource(_execService.submit(c));
-            _blobPending.put(topologyId, localResource);
+            Supplier<Void> supplier = new DownloadBlobs(topologyId, assignment.get_owner());
+            localResource = new LocalDownloadedResource(CompletableFuture.supplyAsync(supplier, execService));
+            blobPending.put(topologyId, localResource);
         }
-        Future<Void> ret = localResource.reserve(port, assignment);
+        CompletableFuture<Void> ret = localResource.reserve(port, assignment);
         LOG.debug("Reserved blobs {} {}", topologyId, localResource);
         return ret;
     }
 
-    @Override
+    /**
+     * Remove this assignment/port as blocking resources from being cleaned up.
+     *
+     * @param assignment the assignment the resources are for
+     * @param port the port the topology is running on
+     * @throws IOException on any error
+     */
     public synchronized void releaseSlotFor(LocalAssignment assignment, int port) throws IOException {
         final String topologyId = assignment.get_topology_id();
         LOG.debug("Releasing slot for {} {}", topologyId, port);
-        LocalDownloadedResource localResource = _blobPending.get(topologyId);
+        LocalDownloadedResource localResource = blobPending.get(topologyId);
         if (localResource == null || !localResource.release(port, assignment)) {
             LOG.warn("Released blob reference {} {} for something that we didn't have {}", topologyId, port, localResource);
         } else if (localResource.isDone()){
             LOG.info("Released blob reference {} {} Cleaning up BLOB references...", topologyId, port);
-            _blobPending.remove(topologyId);
-            Map<String, Object> topoConf = ConfigUtils.readSupervisorStormConf(_conf, topologyId);
+            blobPending.remove(topologyId);
+            Map<String, Object> topoConf = ConfigUtils.readSupervisorStormConf(conf, topologyId);
             @SuppressWarnings("unchecked")
             Map<String, Map<String, Object>> blobstoreMap = (Map<String, Map<String, Object>>) topoConf.get(Config.TOPOLOGY_BLOBSTORE_MAP);
             if (blobstoreMap != null) {
                 String user = assignment.get_owner();
                 String topoName = (String) topoConf.get(Config.TOPOLOGY_NAME);
-                
+
                 for (Map.Entry<String, Map<String, Object>> entry : blobstoreMap.entrySet()) {
                     String key = entry.getKey();
                     Map<String, Object> blobInfo = entry.getValue();
                     try {
-                        _localizer.removeBlobReference(key, user, topoName, SupervisorUtils.shouldUncompressBlob(blobInfo));
+                        removeBlobReference(key, user, topoName, SupervisorUtils.shouldUncompressBlob(blobInfo));
                     } catch (Exception e) {
                         throw new IOException(e);
                     }
@@ -418,37 +574,585 @@ public class AsyncLocalizer implements ILocalizer, Shutdownable {
         } else {
             LOG.debug("Released blob reference {} {} still waiting on {}", topologyId, port, localResource);
         }
-        
-        localResource = _basicPending.get(topologyId);
+
+        localResource = basicPending.get(topologyId);
         if (localResource == null || !localResource.release(port, assignment)) {
             LOG.warn("Released basic reference {} {} for something that we didn't have {}", topologyId, port, localResource);
         } else if (localResource.isDone()){
             LOG.info("Released blob reference {} {} Cleaning up basic files...", topologyId, port);
-            _basicPending.remove(topologyId);
-            String path = ConfigUtils.supervisorStormDistRoot(_conf, topologyId);
-            _fsOps.deleteIfExists(new File(path), null, "rmr "+topologyId);
+            basicPending.remove(topologyId);
+            String path = ConfigUtils.supervisorStormDistRoot(conf, topologyId);
+            fsOps.deleteIfExists(new File(path), null, "rmr "+topologyId);
         } else {
             LOG.debug("Released basic reference {} {} still waiting on {}", topologyId, port, localResource);
         }
     }
 
-    @Override
     public synchronized void cleanupUnusedTopologies() throws IOException {
-        File distRoot = new File(ConfigUtils.supervisorStormDistRoot(_conf));
+        File distRoot = new File(ConfigUtils.supervisorStormDistRoot(conf));
         LOG.info("Cleaning up unused topologies in {}", distRoot);
         File[] children = distRoot.listFiles();
         if (children != null) {
             for (File topoDir : children) {
                 String topoId = URLDecoder.decode(topoDir.getName(), "UTF-8");
-                if (_basicPending.get(topoId) == null && _blobPending.get(topoId) == null) {
-                    _fsOps.deleteIfExists(topoDir, null, "rmr " + topoId);
+                if (basicPending.get(topoId) == null && blobPending.get(topoId) == null) {
+                    fsOps.deleteIfExists(topoDir, null, "rmr " + topoId);
                 }
             }
         }
     }
 
-    @Override
-    public void shutdown() {
-        _execService.shutdown();
+    //From Localizer
+
+    // For testing, it allows setting size in bytes
+    protected void setTargetCacheSize(long size) {
+        cacheTargetSize = size;
+    }
+
+    // For testing, be careful as it doesn't clone
+    ConcurrentMap<String, LocalizedResourceSet> getUserResources() {
+        return userRsrc;
+    }
+
+    // baseDir/supervisor/usercache/
+    protected File getUserCacheDir() {
+        return new File(localBaseDir, USERCACHE);
+    }
+
+    // baseDir/supervisor/usercache/user1/
+    protected File getLocalUserDir(String userName) {
+        return new File(getUserCacheDir(), userName);
+    }
+
+    // baseDir/supervisor/usercache/user1/filecache
+    public File getLocalUserFileCacheDir(String userName) {
+        return new File(getLocalUserDir(userName), FILECACHE);
+    }
+
+    // baseDir/supervisor/usercache/user1/filecache/files
+    protected File getCacheDirForFiles(File dir) {
+        return new File(dir, FILESDIR);
+    }
+
+    // get the directory to put uncompressed archives in
+    // baseDir/supervisor/usercache/user1/filecache/archives
+    protected File getCacheDirForArchives(File dir) {
+        return new File(dir, ARCHIVESDIR);
+    }
+
+    protected void addLocalizedResourceInDir(String dir, LocalizedResourceSet lrsrcSet,
+                                             boolean uncompress) {
+        File[] lrsrcs = readCurrentBlobs(dir);
+
+        if (lrsrcs != null) {
+            for (File rsrc : lrsrcs) {
+                LOG.info("add localized in dir found: " + rsrc);
+                /// strip off .suffix
+                String path = rsrc.getPath();
+                int p = path.lastIndexOf('.');
+                if (p > 0) {
+                    path = path.substring(0, p);
+                }
+                LOG.debug("local file is: {} path is: {}", rsrc.getPath(), path);
+                LocalizedResource lrsrc = new LocalizedResource(new File(path).getName(), path,
+                    uncompress);
+                lrsrcSet.add(lrsrc.getKey(), lrsrc, uncompress);
+            }
+        }
+    }
+
+    // Looks for files in the directory with .current suffix
+    protected File[] readCurrentBlobs(String location) {
+        File dir = new File(location);
+        File[] files = null;
+        if (dir.exists()) {
+            files = dir.listFiles(new FilenameFilter() {
+                @Override
+                public boolean accept(File dir, String name) {
+                    return name.toLowerCase().endsWith(ServerUtils.DEFAULT_CURRENT_BLOB_SUFFIX);
+                }
+            });
+        }
+        return files;
+    }
+
+    // Check to see if there are any existing files already localized.
+    protected void reconstructLocalizedResources() {
+        try {
+            LOG.info("Reconstruct localized resource: " + getUserCacheDir().getPath());
+            Collection<File> users = ConfigUtils.readDirFiles(getUserCacheDir().getPath());
+            if (!(users == null || users.isEmpty())) {
+                for (File userDir : users) {
+                    String user = userDir.getName();
+                    LOG.debug("looking in: {} for user: {}", userDir.getPath(), user);
+                    LocalizedResourceSet newSet = new LocalizedResourceSet(user);
+                    LocalizedResourceSet lrsrcSet = userRsrc.putIfAbsent(user, newSet);
+                    if (lrsrcSet == null) {
+                        lrsrcSet = newSet;
+                    }
+                    addLocalizedResourceInDir(getCacheDirForFiles(getLocalUserFileCacheDir(user)).getPath(),
+                        lrsrcSet, false);
+                    addLocalizedResourceInDir(
+                        getCacheDirForArchives(getLocalUserFileCacheDir(user)).getPath(),
+                        lrsrcSet, true);
+                }
+            } else {
+                LOG.warn("No left over resources found for any user during reconstructing of local resources at: {}", getUserCacheDir().getPath());
+            }
+        } catch (Exception e) {
+            LOG.error("ERROR reconstructing localized resources", e);
+        }
+    }
+
+    // ignores invalid user/topo/key
+    public synchronized void removeBlobReference(String key, String user, String topo,
+                                                 boolean uncompress) throws AuthorizationException, KeyNotFoundException {
+        LocalizedResourceSet lrsrcSet = userRsrc.get(user);
+        if (lrsrcSet != null) {
+            LocalizedResource lrsrc = lrsrcSet.get(key, uncompress);
+            if (lrsrc != null) {
+                LOG.debug("removing blob reference to: {} for topo: {}", key, topo);
+                lrsrc.removeReference(topo);
+            } else {
+                LOG.warn("trying to remove non-existent blob, key: " + key + " for user: " + user +
+                    " topo: " + topo);
+            }
+        } else {
+            LOG.warn("trying to remove blob for non-existent resource set for user: " + user + " key: "
+                + key + " topo: " + topo);
+        }
+    }
+
+    public synchronized void addReferences(List<LocalResource> localresource, String user,
+                                           String topo) {
+        LocalizedResourceSet lrsrcSet = userRsrc.get(user);
+        if (lrsrcSet != null) {
+            for (LocalResource blob : localresource) {
+                LocalizedResource lrsrc = lrsrcSet.get(blob.getBlobName(), blob.shouldUncompress());
+                if (lrsrc != null) {
+                    lrsrc.addReference(topo);
+                    LOG.debug("added reference for topo: {} key: {}", topo, blob);
+                } else {
+                    LOG.warn("trying to add reference to non-existent blob, key: " + blob + " topo: " + topo);
+                }
+            }
+        } else {
+            LOG.warn("trying to add reference to non-existent local resource set, " +
+                "user: " + user + " topo: " + topo);
+        }
+    }
+
+    /**
+     * This function either returns the blob in the existing cache or if it doesn't exist in the
+     * cache, it will download the blob and will block until the download is complete.
+     */
+    public LocalizedResource getBlob(LocalResource localResource, String user, String topo,
+                                     File userFileDir) throws AuthorizationException, KeyNotFoundException, IOException {
+        ArrayList<LocalResource> arr = new ArrayList<LocalResource>();
+        arr.add(localResource);
+        List<LocalizedResource> results = getBlobs(arr, user, topo, userFileDir);
+        if (results.isEmpty() || results.size() != 1) {
+            throw new IOException("Unknown error getting blob: " + localResource + ", for user: " + user +
+                ", topo: " + topo);
+        }
+        return results.get(0);
+    }
+
+    protected boolean isLocalizedResourceDownloaded(LocalizedResource lrsrc) {
+        File rsrcFileCurrent = new File(lrsrc.getCurrentSymlinkPath());
+        File rsrcFileWithVersion = new File(lrsrc.getFilePathWithVersion());
+        File versionFile = new File(lrsrc.getVersionFilePath());
+        return (rsrcFileWithVersion.exists() && rsrcFileCurrent.exists() && versionFile.exists());
+    }
+
+    protected boolean isLocalizedResourceUpToDate(LocalizedResource lrsrc,
+                                                  ClientBlobStore blobstore) throws AuthorizationException, KeyNotFoundException {
+        String localFile = lrsrc.getFilePath();
+        long nimbusBlobVersion = ServerUtils.nimbusVersionOfBlob(lrsrc.getKey(), blobstore);
+        long currentBlobVersion = ServerUtils.localVersionOfBlob(localFile);
+        return (nimbusBlobVersion == currentBlobVersion);
+    }
+
+    protected ClientBlobStore getClientBlobStore() {
+        return ServerUtils.getClientBlobStoreForSupervisor(conf);
+    }
+
+    /**
+     * This function updates blobs on the supervisor. It uses a separate thread pool and runs
+     * asynchronously of the download and delete.
+     */
+    public List<LocalizedResource> updateBlobs(List<LocalResource> localResources,
+                                               String user) throws AuthorizationException, KeyNotFoundException, IOException {
+        LocalizedResourceSet lrsrcSet = userRsrc.get(user);
+        ArrayList<LocalizedResource> results = new ArrayList<>();
+        ArrayList<Callable<LocalizedResource>> updates = new ArrayList<>();
+
+        if (lrsrcSet == null) {
+            // resource set must have been removed
+            return results;
+        }
+        ClientBlobStore blobstore = null;
+        try {
+            blobstore = getClientBlobStore();
+            for (LocalResource localResource: localResources) {
+                String key = localResource.getBlobName();
+                LocalizedResource lrsrc = lrsrcSet.get(key, localResource.shouldUncompress());
+                if (lrsrc == null) {
+                    LOG.warn("blob requested for update doesn't exist: {}", key);
+                    continue;
+                } else if ((boolean) conf.getOrDefault(Config.DISABLE_SYMLINKS, false)) {
+                    LOG.warn("symlinks are disabled so blobs cannot be downloaded.");
+                    continue;
+                } else {
+                    // update it if either the version isn't the latest or if any local blob files are missing
+                    if (!isLocalizedResourceUpToDate(lrsrc, blobstore) ||
+                        !isLocalizedResourceDownloaded(lrsrc)) {
+                        LOG.debug("updating blob: {}", key);
+                        updates.add(new DownloadBlob(this, conf, key, new File(lrsrc.getFilePath()), user,
+                            lrsrc.isUncompressed(), true));
+                    }
+                }
+            }
+        } finally {
+            if(blobstore != null) {
+                blobstore.shutdown();
+            }
+        }
+        try {
+            List<Future<LocalizedResource>> futures = execService.invokeAll(updates);
+            for (Future<LocalizedResource> futureRsrc : futures) {
+                try {
+                    LocalizedResource lrsrc = futureRsrc.get();
+                    // put the resource just in case it was removed at same time by the cleaner
+                    LocalizedResourceSet newSet = new LocalizedResourceSet(user);
+                    LocalizedResourceSet newlrsrcSet = userRsrc.putIfAbsent(user, newSet);
+                    if (newlrsrcSet == null) {
+                        newlrsrcSet = newSet;
+                    }
+                    newlrsrcSet.putIfAbsent(lrsrc.getKey(), lrsrc, lrsrc.isUncompressed());
+                    results.add(lrsrc);
+                }
+                catch (ExecutionException e) {
+                    LOG.error("Error updating blob: ", e);
+                    if (e.getCause() instanceof AuthorizationException) {
+                        throw (AuthorizationException)e.getCause();
+                    }
+                    if (e.getCause() instanceof KeyNotFoundException) {
+                        throw (KeyNotFoundException)e.getCause();
+                    }
+                }
+            }
+        } catch (RejectedExecutionException re) {
+            LOG.error("Error updating blobs : ", re);
+        } catch (InterruptedException ie) {
+            throw new IOException("Interrupted Exception", ie);
+        }
+        return results;
+    }
+
+    /**
+     * This function either returns the blobs in the existing cache or if they don't exist in the
+     * cache, it downloads them in parallel (up to SUPERVISOR_BLOBSTORE_DOWNLOAD_THREAD_COUNT)
+     * and will block until all of them have been downloaded
+     */
+    public synchronized List<LocalizedResource> getBlobs(List<LocalResource> localResources,
+                                                         String user, String topo, File userFileDir)
+        throws AuthorizationException, KeyNotFoundException, IOException {
+        if ((boolean) conf.getOrDefault(Config.DISABLE_SYMLINKS, false)) {
+            throw new KeyNotFoundException("symlinks are disabled so blobs cannot be downloaded.");
+        }
+        LocalizedResourceSet newSet = new LocalizedResourceSet(user);
+        LocalizedResourceSet lrsrcSet = userRsrc.putIfAbsent(user, newSet);
+        if (lrsrcSet == null) {
+            lrsrcSet = newSet;
+        }
+        ArrayList<LocalizedResource> results = new ArrayList<>();
+        ArrayList<Callable<LocalizedResource>> downloads = new ArrayList<>();
+
+        ClientBlobStore blobstore = null;
+        try {
+            blobstore = getClientBlobStore();
+            for (LocalResource localResource: localResources) {
+                String key = localResource.getBlobName();
+                boolean uncompress = localResource.shouldUncompress();
+                LocalizedResource lrsrc = lrsrcSet.get(key, localResource.shouldUncompress());
+                boolean isUpdate = false;
+                if ((lrsrc != null) && (lrsrc.isUncompressed() == localResource.shouldUncompress()) &&
+                    (isLocalizedResourceDownloaded(lrsrc))) {
+                    if (isLocalizedResourceUpToDate(lrsrc, blobstore)) {
+                        LOG.debug("blob already exists: {}", key);
+                        lrsrc.addReference(topo);
+                        results.add(lrsrc);
+                        continue;
+                    }
+                    LOG.debug("blob exists but isn't up to date: {}", key);
+                    isUpdate = true;
+                }
+
+                // go off to blobstore and get it
+                // assume dir passed in exists and has correct permission
+                LOG.debug("fetching blob: {}", key);
+                File downloadDir = getCacheDirForFiles(userFileDir);
+                File localFile = new File(downloadDir, key);
+                if (uncompress) {
+                    // for compressed file, download to archives dir
+                    downloadDir = getCacheDirForArchives(userFileDir);
+                    localFile = new File(downloadDir, key);
+                }
+                downloadDir.mkdir();
+                downloads.add(new DownloadBlob(this, conf, key, localFile, user, uncompress,
+                    isUpdate));
+            }
+        } finally {
+            if(blobstore !=null) {
+                blobstore.shutdown();
+            }
+        }
+        try {
+            List<Future<LocalizedResource>> futures = execService.invokeAll(downloads);
+            for (Future<LocalizedResource> futureRsrc: futures) {
+                LocalizedResource lrsrc = futureRsrc.get();
+                lrsrc.addReference(topo);
+                lrsrcSet.add(lrsrc.getKey(), lrsrc, lrsrc.isUncompressed());
+                results.add(lrsrc);
+            }
+        } catch (ExecutionException e) {
+            if (e.getCause() instanceof AuthorizationException)
+                throw (AuthorizationException)e.getCause();
+            else if (e.getCause() instanceof KeyNotFoundException) {
+                throw (KeyNotFoundException)e.getCause();
+            } else {
+                throw new IOException("Error getting blobs", e);
+            }
+        } catch (RejectedExecutionException re) {
+            throw new IOException("RejectedExecutionException: ", re);
+        } catch (InterruptedException ie) {
+            throw new IOException("Interrupted Exception", ie);
+        }
+        return results;
+    }
+
+    static class DownloadBlob implements Callable<LocalizedResource> {
+
+        private AsyncLocalizer localizer;
+        private Map conf;
+        private String key;
+        private File localFile;
+        private String user;
+        private boolean uncompress;
+        private boolean isUpdate;
+
+        public DownloadBlob(AsyncLocalizer localizer, Map<String, Object> conf, String key, File localFile,
+                            String user, boolean uncompress, boolean update) {
+            this.localizer = localizer;
+            this.conf = conf;
+            this.key = key;
+            this.localFile = localFile;
+            this.user = user;
+            this.uncompress = uncompress;
+            isUpdate = update;
+        }
+
+        @Override
+        public LocalizedResource call()
+            throws AuthorizationException, KeyNotFoundException, IOException  {
+            return localizer.downloadBlob(conf, key, localFile, user, uncompress,
+                isUpdate);
+        }
+    }
+
+    private LocalizedResource downloadBlob(Map<String, Object> conf, String key, File localFile,
+                                           String user, boolean uncompress, boolean isUpdate)
+        throws AuthorizationException, KeyNotFoundException, IOException {
+        ClientBlobStore blobstore = null;
+        try {
+            blobstore = getClientBlobStore();
+            long nimbusBlobVersion = ServerUtils.nimbusVersionOfBlob(key, blobstore);
+            long oldVersion = ServerUtils.localVersionOfBlob(localFile.toString());
+            FileOutputStream out = null;
+            PrintWriter writer = null;
+            int numTries = 0;
+            String localizedPath = localFile.toString();
+            String localFileWithVersion = ServerUtils.constructBlobWithVersionFileName(localFile.toString(),
+                nimbusBlobVersion);
+            String localVersionFile = ServerUtils.constructVersionFileName(localFile.toString());
+            String downloadFile = localFileWithVersion;
+            if (uncompress) {
+                // we need to download to temp file and then unpack into the one requested
+                downloadFile = new File(localFile.getParent(), TO_UNCOMPRESS + localFile.getName()).toString();
+            }
+            while (numTries < blobDownloadRetries) {
+                out = new FileOutputStream(downloadFile);
+                numTries++;
+                try {
+                    if (!ServerUtils.canUserReadBlob(blobstore.getBlobMeta(key), user, conf)) {
+                        throw new AuthorizationException(user + " does not have READ access to " + key);
+                    }
+                    InputStreamWithMeta in = blobstore.getBlob(key);
+                    byte[] buffer = new byte[1024];
+                    int len;
+                    while ((len = in.read(buffer)) >= 0) {
+                        out.write(buffer, 0, len);
+                    }
+                    out.close();
+                    in.close();
+                    if (uncompress) {
+                        ServerUtils.unpack(new File(downloadFile), new File(localFileWithVersion));
+                        LOG.debug("uncompressed " + downloadFile + " to: " + localFileWithVersion);
+                    }
+
+                    // Next write the version.
+                    LOG.info("Blob: " + key + " updated with new Nimbus-provided version: " +
+                        nimbusBlobVersion + " local version was: " + oldVersion);
+                    // The false parameter ensures overwriting the version file, not appending
+                    writer = new PrintWriter(
+                        new BufferedWriter(new FileWriter(localVersionFile, false)));
+                    writer.println(nimbusBlobVersion);
+                    writer.close();
+
+                    try {
+                        setBlobPermissions(conf, user, localFileWithVersion);
+                        setBlobPermissions(conf, user, localVersionFile);
+
+                        // Update the key.current symlink. First create tmp symlink and do
+                        // move of tmp to current so that the operation is atomic.
+                        String tmp_uuid_local = java.util.UUID.randomUUID().toString();
+                        LOG.debug("Creating a symlink @" + localFile + "." + tmp_uuid_local + " , " +
+                            "linking to: " + localFile + "." + nimbusBlobVersion);
+                        File uuid_symlink = new File(localFile + "." + tmp_uuid_local);
+
+                        Files.createSymbolicLink(uuid_symlink.toPath(),
+                            Paths.get(ServerUtils.constructBlobWithVersionFileName(localFile.toString(),
+                                nimbusBlobVersion)));
+                        File current_symlink = new File(ServerUtils.constructBlobCurrentSymlinkName(
+                            localFile.toString()));
+                        Files.move(uuid_symlink.toPath(), current_symlink.toPath(), ATOMIC_MOVE);
+                    } catch (IOException e) {
+                        // if we fail after writing the version file but before we move current link we need to
+                        // restore the old version to the file
+                        try {
+                            PrintWriter restoreWriter = new PrintWriter(
+                                new BufferedWriter(new FileWriter(localVersionFile, false)));
+                            restoreWriter.println(oldVersion);
+                            restoreWriter.close();
+                        } catch (IOException ignore) {}
+                        throw e;
+                    }
+
+                    String oldBlobFile = localFile + "." + oldVersion;
+                    try {
+                        // Remove the old version. Note that if a number of processes have that file open,
+                        // the OS will keep the old blob file around until they all close the handle and only
+                        // then deletes it. No new process will open the old blob, since the users will open the
+                        // blob through the "blob.current" symlink, which always points to the latest version of
+                        // a blob. Remove the old version after the current symlink is updated as to not affect
+                        // anyone trying to read it.
+                        if ((oldVersion != -1) && (oldVersion != nimbusBlobVersion)) {
+                            LOG.info("Removing an old blob file:" + oldBlobFile);
+                            Files.delete(Paths.get(oldBlobFile));
+                        }
+                    } catch (IOException e) {
+                        // At this point we have downloaded everything and moved symlinks.  If the remove of
+                        // old fails just log an error
+                        LOG.error("Exception removing old blob version: " + oldBlobFile);
+                    }
+
+                    break;
+                } catch (AuthorizationException ae) {
+                    // we consider this non-retriable exceptions
+                    if (out != null) {
+                        out.close();
+                    }
+                    new File(downloadFile).delete();
+                    throw ae;
+                } catch (IOException | KeyNotFoundException e) {
+                    if (out != null) {
+                        out.close();
+                    }
+                    if (writer != null) {
+                        writer.close();
+                    }
+                    new File(downloadFile).delete();
+                    if (uncompress) {
+                        try {
+                            FileUtils.deleteDirectory(new File(localFileWithVersion));
+                        } catch (IOException ignore) {}
+                    }
+                    if (!isUpdate) {
+                        // don't want to remove existing version file if its an update
+                        new File(localVersionFile).delete();
+                    }
+
+                    if (numTries < blobDownloadRetries) {
+                        LOG.error("Failed to download blob, retrying", e);
+                    } else {
+                        throw e;
+                    }
+                }
+            }
+            return new LocalizedResource(key, localizedPath, uncompress);
+        } finally {
+            if(blobstore != null) {
+                blobstore.shutdown();
+            }
+        }
+    }
+
+    public void setBlobPermissions(Map<String, Object> conf, String user, String path)
+        throws IOException {
+
+        if (!ObjectReader.getBoolean(conf.get(Config.SUPERVISOR_RUN_WORKER_AS_USER), false)) {
+            return;
+        }
+        String wlCommand = ObjectReader.getString(conf.get(Config.SUPERVISOR_WORKER_LAUNCHER), "");
+        if (wlCommand.isEmpty()) {
+            String stormHome = System.getProperty("storm.home");
+            wlCommand = stormHome + "/bin/worker-launcher";
+        }
+        List<String> command = new ArrayList<String>(Arrays.asList(wlCommand, user, "blob", path));
+
+        String[] commandArray = command.toArray(new String[command.size()]);
+        ShellUtils.ShellCommandExecutor shExec = new ShellUtils.ShellCommandExecutor(commandArray);
+        LOG.info("Setting blob permissions, command: {}", Arrays.toString(commandArray));
+
+        try {
+            shExec.execute();
+            LOG.debug("output: {}", shExec.getOutput());
+        } catch (ShellUtils.ExitCodeException e) {
+            int exitCode = shExec.getExitCode();
+            LOG.warn("Exit code from worker-launcher is : " + exitCode, e);
+            LOG.debug("output: {}", shExec.getOutput());
+            throw new IOException("Setting blob permissions failed" +
+                " (exitCode=" + exitCode + ") with output: " + shExec.getOutput(), e);
+        }
     }
+
+
+    public synchronized void cleanup() {
+        LocalizedResourceRetentionSet toClean = new LocalizedResourceRetentionSet(cacheTargetSize);
+        // need one large set of all and then clean via LRU
+        for (LocalizedResourceSet t : userRsrc.values()) {
+            toClean.addResources(t);
+            LOG.debug("Resources to be cleaned after adding {} : {}", t.getUser(), toClean);
+        }
+        toClean.cleanup();
+        LOG.debug("Resource cleanup: {}", toClean);
+        for (LocalizedResourceSet t : userRsrc.values()) {
+            if (t.getSize() == 0) {
+                String user = t.getUser();
+
+                LOG.debug("removing empty set: {}", user);
+                File userFileCacheDir = getLocalUserFileCacheDir(user);
+                getCacheDirForFiles(userFileCacheDir).delete();
+                getCacheDirForArchives(userFileCacheDir).delete();
+                getLocalUserFileCacheDir(user).delete();
+                boolean dirsRemoved = getLocalUserDir(user).delete();
+                // to catch race with update thread
+                if (dirsRemoved) {
+                    userRsrc.remove(user);
+                }
+            }
+        }
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/78cb243c/storm-server/src/main/java/org/apache/storm/localizer/ILocalizer.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/localizer/ILocalizer.java b/storm-server/src/main/java/org/apache/storm/localizer/ILocalizer.java
deleted file mode 100644
index 7105095..0000000
--- a/storm-server/src/main/java/org/apache/storm/localizer/ILocalizer.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.localizer;
-
-import java.io.IOException;
-import java.util.concurrent.Future;
-
-import org.apache.storm.generated.LocalAssignment;
-
-/**
- * Download blobs from the blob store and keep them up to date.
- */
-public interface ILocalizer {
-
-    /**
-     * Recover a running topology by incrementing references for what it has already downloaded.
-     * @param assignment the assignment the resources are for
-     * @param port the port the topology is running in.
-     */
-    void recoverRunningTopology(LocalAssignment assignemnt, int port);
-    
-    /**
-     * Download storm.jar, storm.conf, and storm.ser for this topology if not done so already,
-     * and inc a reference count on them.
-     * @param assignment the assignment the resources are for
-     * @param port the port the topology is running on
-     * @return a future to let you know when they are done.
-     * @throws IOException on error 
-     */
-    Future<Void> requestDownloadBaseTopologyBlobs(LocalAssignment assignment, int port) throws IOException;
-
-    /**
-     * Download the blobs for this topology (reading in list in from the config)
-     * and inc reference count for these blobs.
-     * PRECONDITION: requestDownloadBaseTopologyBlobs has completed downloading.
-     * @param assignment the assignment the resources are for
-     * @param port the port the topology is running on
-     * @return a future to let you know when they are done.
-     */
-    Future<Void> requestDownloadTopologyBlobs(LocalAssignment assignment, int port);
-    
-    /**
-     * dec reference count on all blobs associated with this topology.
-     * @param assignment the assignment the resources are for
-     * @param port the port the topology is running on
-     * @throws IOException on any error
-     */
-    void releaseSlotFor(LocalAssignment assignment, int port) throws IOException;
-    
-    /**
-     * Clean up any topologies that are not in use right now.
-     * @throws IOException on any error.
-     */
-    void cleanupUnusedTopologies() throws IOException;
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/78cb243c/storm-server/src/main/java/org/apache/storm/localizer/LocalDownloadedResource.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/localizer/LocalDownloadedResource.java b/storm-server/src/main/java/org/apache/storm/localizer/LocalDownloadedResource.java
index 570c951..f019374 100644
--- a/storm-server/src/main/java/org/apache/storm/localizer/LocalDownloadedResource.java
+++ b/storm-server/src/main/java/org/apache/storm/localizer/LocalDownloadedResource.java
@@ -19,6 +19,7 @@ package org.apache.storm.localizer;
 
 import java.util.HashSet;
 import java.util.Set;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
@@ -30,39 +31,6 @@ import org.slf4j.LoggerFactory;
 
 public class LocalDownloadedResource {
     private static final Logger LOG = LoggerFactory.getLogger(LocalDownloadedResource.class);
-    private static class NoCancelFuture<T> implements Future<T> {
-        private final Future<T> _wrapped;
-        
-        public NoCancelFuture(Future<T> wrapped) {
-            _wrapped = wrapped;
-        }
-        
-        @Override
-        public boolean cancel(boolean mayInterruptIfRunning) {
-            //cancel not currently supported
-            return false;
-        }
-
-        @Override
-        public boolean isCancelled() {
-            return false;
-        }
-
-        @Override
-        public boolean isDone() {
-            return _wrapped.isDone();
-        }
-
-        @Override
-        public T get() throws InterruptedException, ExecutionException {
-            return _wrapped.get();
-        }
-
-        @Override
-        public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
-            return _wrapped.get(timeout, unit);
-        }
-    }
     private static class PortNAssignment {
         private final int _port;
         private final LocalAssignment _assignment;
@@ -91,13 +59,13 @@ public class LocalDownloadedResource {
             return "{"+ _port + " " + _assignment +"}";
         }
     }
-    private final Future<Void> _pending;
+    private final CompletableFuture<Void> _pending;
     private final Set<PortNAssignment> _references;
     private boolean _isDone;
     
     
-    public LocalDownloadedResource(Future<Void> pending) {
-        _pending = new NoCancelFuture<>(pending);
+    public LocalDownloadedResource(CompletableFuture<Void> pending) {
+        _pending = pending;
         _references = new HashSet<>();
         _isDone = false;
     }
@@ -108,7 +76,7 @@ public class LocalDownloadedResource {
      * @param la the assignment this is for
      * @return a future that can be used to track it being downloaded.
      */
-    public synchronized Future<Void> reserve(int port, LocalAssignment la) {
+    public synchronized CompletableFuture<Void> reserve(int port, LocalAssignment la) {
         PortNAssignment pna = new PortNAssignment(port, la);
         if (!_references.add(pna)) {
             LOG.warn("Resources {} already reserved {} for this topology", pna, _references);


[2/9] storm git commit: STORM-2084: Refactor localization to combine files together

Posted by ka...@apache.org.
http://git-wip-us.apache.org/repos/asf/storm/blob/78cb243c/storm-server/src/main/java/org/apache/storm/localizer/Localizer.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/localizer/Localizer.java b/storm-server/src/main/java/org/apache/storm/localizer/Localizer.java
deleted file mode 100644
index 353ab56..0000000
--- a/storm-server/src/main/java/org/apache/storm/localizer/Localizer.java
+++ /dev/null
@@ -1,695 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.localizer;
-
-import org.apache.storm.Config;
-import org.apache.storm.DaemonConfig;
-import org.apache.storm.blobstore.ClientBlobStore;
-import org.apache.storm.blobstore.InputStreamWithMeta;
-import org.apache.storm.generated.AuthorizationException;
-import org.apache.storm.generated.KeyNotFoundException;
-import org.apache.storm.utils.ConfigUtils;
-import org.apache.storm.utils.ServerUtils;
-import org.apache.storm.utils.ObjectReader;
-import org.apache.storm.utils.ShellUtils.ExitCodeException;
-import org.apache.storm.utils.ShellUtils.ShellCommandExecutor;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-import org.apache.commons.io.FileUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.BufferedWriter;
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.FileWriter;
-import java.io.FilenameFilter;
-import java.io.IOException;
-import java.io.PrintWriter;
-import java.nio.file.Files;
-import java.nio.file.Paths;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Map;
-import java.util.Collection;
-import java.util.ArrayList;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.RejectedExecutionException;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-
-import static java.nio.file.StandardCopyOption.ATOMIC_MOVE;
-
-/**
- * Class to download and manage files from the blobstore.  It uses an LRU cache
- * to determine which files to keep so they can be reused and which files to delete.
- */
-public class Localizer {
-  public static final Logger LOG = LoggerFactory.getLogger(Localizer.class);
-  public static final String FILECACHE = "filecache";
-  public static final String USERCACHE = "usercache";
-  // sub directories to store either files or uncompressed archives respectively
-  public static final String FILESDIR = "files";
-  public static final String ARCHIVESDIR = "archives";
-
-  private static final String TO_UNCOMPRESS = "_tmp_";
-  
-  
-  
-  private final Map<String, Object> _conf;
-  private final int _threadPoolSize;
-  // thread pool for initial download
-  private final ExecutorService _execService;
-  // thread pool for updates
-  private final ExecutorService _updateExecService;
-  private final int _blobDownloadRetries;
-
-  // track resources - user to resourceSet
-  private final ConcurrentMap<String, LocalizedResourceSet> _userRsrc = new
-      ConcurrentHashMap<String, LocalizedResourceSet>();
-
-  private final String _localBaseDir;
-
-  // cleanup
-  private long _cacheTargetSize;
-  private long _cacheCleanupPeriod;
-  private ScheduledExecutorService _cacheCleanupService;
-
-  public Localizer(Map<String, Object> conf, String baseDir) {
-    _conf = conf;
-    _localBaseDir = baseDir;
-    // default cache size 10GB, converted to Bytes
-    _cacheTargetSize = ObjectReader.getInt(_conf.get(DaemonConfig.SUPERVISOR_LOCALIZER_CACHE_TARGET_SIZE_MB),
-            10 * 1024).longValue() << 20;
-    // default 10 minutes.
-    _cacheCleanupPeriod = ObjectReader.getInt(_conf.get(
-            DaemonConfig.SUPERVISOR_LOCALIZER_CACHE_CLEANUP_INTERVAL_MS), 10 * 60 * 1000).longValue();
-
-    // if we needed we could make config for update thread pool size
-    _threadPoolSize = ObjectReader.getInt(_conf.get(DaemonConfig.SUPERVISOR_BLOBSTORE_DOWNLOAD_THREAD_COUNT), 5);
-    _blobDownloadRetries = ObjectReader.getInt(_conf.get(
-            DaemonConfig.SUPERVISOR_BLOBSTORE_DOWNLOAD_MAX_RETRIES), 3);
-
-    _execService = Executors.newFixedThreadPool(_threadPoolSize);
-    _updateExecService = Executors.newFixedThreadPool(_threadPoolSize);
-    reconstructLocalizedResources();
-  }
-
-  // For testing, it allows setting size in bytes
-  protected void setTargetCacheSize(long size) {
-    _cacheTargetSize = size;
-  }
-
-  // For testing, be careful as it doesn't clone
-  ConcurrentMap<String, LocalizedResourceSet> getUserResources() {
-    return _userRsrc;
-  }
-
-  public void startCleaner() {
-    _cacheCleanupService = new ScheduledThreadPoolExecutor(1,
-        new ThreadFactoryBuilder()
-            .setNameFormat("Localizer Cache Cleanup")
-            .build());
-
-    _cacheCleanupService.scheduleWithFixedDelay(new Runnable() {
-          @Override
-          public void run() {
-            handleCacheCleanup();
-          }
-        }, _cacheCleanupPeriod, _cacheCleanupPeriod, TimeUnit.MILLISECONDS);
-  }
-
-  public void shutdown() {
-    if (_cacheCleanupService != null) {
-      _cacheCleanupService.shutdown();
-    }
-    if (_execService != null) {
-      _execService.shutdown();
-    }
-    if (_updateExecService != null) {
-      _updateExecService.shutdown();
-    }
-  }
-
-  // baseDir/supervisor/usercache/
-  protected File getUserCacheDir() {
-    return new File(_localBaseDir, USERCACHE);
-  }
-
-  // baseDir/supervisor/usercache/user1/
-  protected File getLocalUserDir(String userName) {
-    return new File(getUserCacheDir(), userName);
-  }
-
-  // baseDir/supervisor/usercache/user1/filecache
-  public File getLocalUserFileCacheDir(String userName) {
-    return new File(getLocalUserDir(userName), FILECACHE);
-  }
-
-  // baseDir/supervisor/usercache/user1/filecache/files
-  protected File getCacheDirForFiles(File dir) {
-    return new File(dir, FILESDIR);
-  }
-
-  // get the directory to put uncompressed archives in
-  // baseDir/supervisor/usercache/user1/filecache/archives
-  protected File getCacheDirForArchives(File dir) {
-    return new File(dir, ARCHIVESDIR);
-  }
-
-  protected void addLocalizedResourceInDir(String dir, LocalizedResourceSet lrsrcSet,
-      boolean uncompress) {
-    File[] lrsrcs = readCurrentBlobs(dir);
-
-    if (lrsrcs != null) {
-      for (File rsrc : lrsrcs) {
-        LOG.info("add localized in dir found: " + rsrc);
-        /// strip off .suffix
-        String path = rsrc.getPath();
-        int p = path.lastIndexOf('.');
-        if (p > 0) {
-          path = path.substring(0, p);
-        }
-        LOG.debug("local file is: {} path is: {}", rsrc.getPath(), path);
-        LocalizedResource lrsrc = new LocalizedResource(new File(path).getName(), path,
-            uncompress);
-        lrsrcSet.add(lrsrc.getKey(), lrsrc, uncompress);
-      }
-    }
-  }
-
-  // Looks for files in the directory with .current suffix
-  protected File[] readCurrentBlobs(String location) {
-    File dir = new File(location);
-    File[] files = null;
-    if (dir.exists()) {
-      files = dir.listFiles(new FilenameFilter() {
-        @Override
-        public boolean accept(File dir, String name) {
-          return name.toLowerCase().endsWith(ServerUtils.DEFAULT_CURRENT_BLOB_SUFFIX);
-        }
-      });
-    }
-    return files;
-  }
-
-  // Check to see if there are any existing files already localized.
-  protected void reconstructLocalizedResources() {
-    try {
-      LOG.info("Reconstruct localized resource: " + getUserCacheDir().getPath());
-      Collection<File> users = ConfigUtils.readDirFiles(getUserCacheDir().getPath());
-      if (!(users == null || users.isEmpty())) {
-        for (File userDir : users) {
-          String user = userDir.getName();
-          LOG.debug("looking in: {} for user: {}", userDir.getPath(), user);
-          LocalizedResourceSet newSet = new LocalizedResourceSet(user);
-          LocalizedResourceSet lrsrcSet = _userRsrc.putIfAbsent(user, newSet);
-          if (lrsrcSet == null) {
-            lrsrcSet = newSet;
-          }
-          addLocalizedResourceInDir(getCacheDirForFiles(getLocalUserFileCacheDir(user)).getPath(),
-              lrsrcSet, false);
-          addLocalizedResourceInDir(
-              getCacheDirForArchives(getLocalUserFileCacheDir(user)).getPath(),
-              lrsrcSet, true);
-        }
-      } else {
-        LOG.warn("No left over resources found for any user during reconstructing of local resources at: {}", getUserCacheDir().getPath());
-      }
-    } catch (Exception e) {
-      LOG.error("ERROR reconstructing localized resources", e);
-    }
-  }
-
-  // ignores invalid user/topo/key
-  public synchronized void removeBlobReference(String key, String user, String topo,
-      boolean uncompress) throws AuthorizationException, KeyNotFoundException {
-    LocalizedResourceSet lrsrcSet = _userRsrc.get(user);
-    if (lrsrcSet != null) {
-      LocalizedResource lrsrc = lrsrcSet.get(key, uncompress);
-      if (lrsrc != null) {
-        LOG.debug("removing blob reference to: {} for topo: {}", key, topo);
-        lrsrc.removeReference(topo);
-      } else {
-        LOG.warn("trying to remove non-existent blob, key: " + key + " for user: " + user +
-            " topo: " + topo);
-      }
-    } else {
-      LOG.warn("trying to remove blob for non-existent resource set for user: " + user + " key: "
-          + key + " topo: " + topo);
-    }
-  }
-
-  public synchronized void addReferences(List<LocalResource> localresource, String user,
-       String topo) {
-    LocalizedResourceSet lrsrcSet = _userRsrc.get(user);
-    if (lrsrcSet != null) {
-      for (LocalResource blob : localresource) {
-        LocalizedResource lrsrc = lrsrcSet.get(blob.getBlobName(), blob.shouldUncompress());
-        if (lrsrc != null) {
-          lrsrc.addReference(topo);
-          LOG.debug("added reference for topo: {} key: {}", topo, blob);
-        } else {
-          LOG.warn("trying to add reference to non-existent blob, key: " + blob + " topo: " + topo);
-        }
-      }
-    } else {
-      LOG.warn("trying to add reference to non-existent local resource set, " +
-          "user: " + user + " topo: " + topo);
-    }
-  }
-
-  /**
-   * This function either returns the blob in the existing cache or if it doesn't exist in the
-   * cache, it will download the blob and will block until the download is complete.
-   */
-  public LocalizedResource getBlob(LocalResource localResource, String user, String topo,
-       File userFileDir) throws AuthorizationException, KeyNotFoundException, IOException {
-    ArrayList<LocalResource> arr = new ArrayList<LocalResource>();
-    arr.add(localResource);
-    List<LocalizedResource> results = getBlobs(arr, user, topo, userFileDir);
-    if (results.isEmpty() || results.size() != 1) {
-      throw new IOException("Unknown error getting blob: " + localResource + ", for user: " + user +
-          ", topo: " + topo);
-    }
-    return results.get(0);
-  }
-
-  protected boolean isLocalizedResourceDownloaded(LocalizedResource lrsrc) {
-    File rsrcFileCurrent = new File(lrsrc.getCurrentSymlinkPath());
-    File rsrcFileWithVersion = new File(lrsrc.getFilePathWithVersion());
-    File versionFile = new File(lrsrc.getVersionFilePath());
-    return (rsrcFileWithVersion.exists() && rsrcFileCurrent.exists() && versionFile.exists());
-  }
-
-  protected boolean isLocalizedResourceUpToDate(LocalizedResource lrsrc,
-      ClientBlobStore blobstore) throws AuthorizationException, KeyNotFoundException {
-    String localFile = lrsrc.getFilePath();
-    long nimbusBlobVersion = ServerUtils.nimbusVersionOfBlob(lrsrc.getKey(), blobstore);
-    long currentBlobVersion = ServerUtils.localVersionOfBlob(localFile);
-    return (nimbusBlobVersion == currentBlobVersion);
-  }
-
-  protected ClientBlobStore getClientBlobStore() {
-    return ServerUtils.getClientBlobStoreForSupervisor(_conf);
-  }
-
-  /**
-   * This function updates blobs on the supervisor. It uses a separate thread pool and runs
-   * asynchronously of the download and delete.
-   */
-  public List<LocalizedResource> updateBlobs(List<LocalResource> localResources,
-       String user) throws AuthorizationException, KeyNotFoundException, IOException {
-    LocalizedResourceSet lrsrcSet = _userRsrc.get(user);
-    ArrayList<LocalizedResource> results = new ArrayList<>();
-    ArrayList<Callable<LocalizedResource>> updates = new ArrayList<>();
-
-    if (lrsrcSet == null) {
-      // resource set must have been removed
-      return results;
-    }
-    ClientBlobStore blobstore = null;
-    try {
-      blobstore = getClientBlobStore();
-      for (LocalResource localResource: localResources) {
-        String key = localResource.getBlobName();
-        LocalizedResource lrsrc = lrsrcSet.get(key, localResource.shouldUncompress());
-        if (lrsrc == null) {
-          LOG.warn("blob requested for update doesn't exist: {}", key);
-          continue;
-        } else if ((boolean)_conf.getOrDefault(Config.DISABLE_SYMLINKS, false)) {
-          LOG.warn("symlinks are disabled so blobs cannot be downloaded.");
-          continue;
-        } else {
-          // update it if either the version isn't the latest or if any local blob files are missing
-          if (!isLocalizedResourceUpToDate(lrsrc, blobstore) ||
-              !isLocalizedResourceDownloaded(lrsrc)) {
-            LOG.debug("updating blob: {}", key);
-            updates.add(new DownloadBlob(this, _conf, key, new File(lrsrc.getFilePath()), user,
-                lrsrc.isUncompressed(), true));
-          }
-        }
-      }
-    } finally {
-      if(blobstore != null) {
-        blobstore.shutdown();
-      }
-    }
-    try {
-      List<Future<LocalizedResource>> futures = _updateExecService.invokeAll(updates);
-      for (Future<LocalizedResource> futureRsrc : futures) {
-        try {
-          LocalizedResource lrsrc = futureRsrc.get();
-          // put the resource just in case it was removed at same time by the cleaner
-          LocalizedResourceSet newSet = new LocalizedResourceSet(user);
-          LocalizedResourceSet newlrsrcSet = _userRsrc.putIfAbsent(user, newSet);
-          if (newlrsrcSet == null) {
-            newlrsrcSet = newSet;
-          }
-          newlrsrcSet.putIfAbsent(lrsrc.getKey(), lrsrc, lrsrc.isUncompressed());
-          results.add(lrsrc);
-        }
-        catch (ExecutionException e) {
-          LOG.error("Error updating blob: ", e);
-          if (e.getCause() instanceof AuthorizationException) {
-            throw (AuthorizationException)e.getCause();
-          }
-          if (e.getCause() instanceof KeyNotFoundException) {
-            throw (KeyNotFoundException)e.getCause();
-          }
-        }
-      }
-    } catch (RejectedExecutionException re) {
-      LOG.error("Error updating blobs : ", re);
-    } catch (InterruptedException ie) {
-      throw new IOException("Interrupted Exception", ie);
-    }
-    return results;
-  }
-
-  /**
-   * This function either returns the blobs in the existing cache or if they don't exist in the
-   * cache, it downloads them in parallel (up to SUPERVISOR_BLOBSTORE_DOWNLOAD_THREAD_COUNT)
-   * and will block until all of them have been downloaded
-   */
-  public synchronized List<LocalizedResource> getBlobs(List<LocalResource> localResources,
-      String user, String topo, File userFileDir)
-      throws AuthorizationException, KeyNotFoundException, IOException {
-    if ((boolean)_conf.getOrDefault(Config.DISABLE_SYMLINKS, false)) {
-      throw new KeyNotFoundException("symlinks are disabled so blobs cannot be downloaded.");
-    }
-    LocalizedResourceSet newSet = new LocalizedResourceSet(user);
-    LocalizedResourceSet lrsrcSet = _userRsrc.putIfAbsent(user, newSet);
-    if (lrsrcSet == null) {
-      lrsrcSet = newSet;
-    }
-    ArrayList<LocalizedResource> results = new ArrayList<>();
-    ArrayList<Callable<LocalizedResource>> downloads = new ArrayList<>();
-
-    ClientBlobStore blobstore = null;
-    try {
-      blobstore = getClientBlobStore();
-      for (LocalResource localResource: localResources) {
-        String key = localResource.getBlobName();
-        boolean uncompress = localResource.shouldUncompress();
-        LocalizedResource lrsrc = lrsrcSet.get(key, localResource.shouldUncompress());
-        boolean isUpdate = false;
-        if ((lrsrc != null) && (lrsrc.isUncompressed() == localResource.shouldUncompress()) &&
-            (isLocalizedResourceDownloaded(lrsrc))) {
-          if (isLocalizedResourceUpToDate(lrsrc, blobstore)) {
-            LOG.debug("blob already exists: {}", key);
-            lrsrc.addReference(topo);
-            results.add(lrsrc);
-            continue;
-          }
-          LOG.debug("blob exists but isn't up to date: {}", key);
-          isUpdate = true;
-        }
-
-        // go off to blobstore and get it
-        // assume dir passed in exists and has correct permission
-        LOG.debug("fetching blob: {}", key);
-        File downloadDir = getCacheDirForFiles(userFileDir);
-        File localFile = new File(downloadDir, key);
-        if (uncompress) {
-          // for compressed file, download to archives dir
-          downloadDir = getCacheDirForArchives(userFileDir);
-          localFile = new File(downloadDir, key);
-        }
-        downloadDir.mkdir();
-        downloads.add(new DownloadBlob(this, _conf, key, localFile, user, uncompress,
-            isUpdate));
-      }
-    } finally {
-      if(blobstore !=null) {
-        blobstore.shutdown();
-      }
-    }
-    try {
-      List<Future<LocalizedResource>> futures = _execService.invokeAll(downloads);
-      for (Future<LocalizedResource> futureRsrc: futures) {
-        LocalizedResource lrsrc = futureRsrc.get();
-        lrsrc.addReference(topo);
-        lrsrcSet.add(lrsrc.getKey(), lrsrc, lrsrc.isUncompressed());
-        results.add(lrsrc);
-      }
-    } catch (ExecutionException e) {
-      if (e.getCause() instanceof AuthorizationException)
-        throw (AuthorizationException)e.getCause();
-      else if (e.getCause() instanceof KeyNotFoundException) {
-        throw (KeyNotFoundException)e.getCause();
-      } else {
-        throw new IOException("Error getting blobs", e);
-      }
-    } catch (RejectedExecutionException re) {
-      throw new IOException("RejectedExecutionException: ", re);
-    } catch (InterruptedException ie) {
-      throw new IOException("Interrupted Exception", ie);
-    }
-    return results;
-  }
-
-  static class DownloadBlob implements Callable<LocalizedResource> {
-
-    private Localizer _localizer;
-    private Map _conf;
-    private String _key;
-    private File _localFile;
-    private String _user;
-    private boolean _uncompress;
-    private boolean _isUpdate;
-
-    public DownloadBlob(Localizer localizer, Map<String, Object> conf, String key, File localFile,
-        String user, boolean uncompress, boolean update) {
-      _localizer = localizer;
-      _conf = conf;
-      _key = key;
-      _localFile = localFile;
-      _user = user;
-      _uncompress = uncompress;
-      _isUpdate = update;
-    }
-
-    @Override
-    public LocalizedResource call()
-        throws AuthorizationException, KeyNotFoundException, IOException  {
-      return _localizer.downloadBlob(_conf, _key, _localFile, _user, _uncompress,
-        _isUpdate);
-    }
-  }
-
-  private LocalizedResource downloadBlob(Map<String, Object> conf, String key, File localFile,
-      String user, boolean uncompress, boolean isUpdate)
-      throws AuthorizationException, KeyNotFoundException, IOException {
-    ClientBlobStore blobstore = null;
-    try {
-      blobstore = getClientBlobStore();
-      long nimbusBlobVersion = ServerUtils.nimbusVersionOfBlob(key, blobstore);
-      long oldVersion = ServerUtils.localVersionOfBlob(localFile.toString());
-      FileOutputStream out = null;
-      PrintWriter writer = null;
-      int numTries = 0;
-      String localizedPath = localFile.toString();
-      String localFileWithVersion = ServerUtils.constructBlobWithVersionFileName(localFile.toString(),
-              nimbusBlobVersion);
-      String localVersionFile = ServerUtils.constructVersionFileName(localFile.toString());
-      String downloadFile = localFileWithVersion;
-      if (uncompress) {
-        // we need to download to temp file and then unpack into the one requested
-        downloadFile = new File(localFile.getParent(), TO_UNCOMPRESS + localFile.getName()).toString();
-      }
-      while (numTries < _blobDownloadRetries) {
-        out = new FileOutputStream(downloadFile);
-        numTries++;
-        try {
-          if (!ServerUtils.canUserReadBlob(blobstore.getBlobMeta(key), user, conf)) {
-            throw new AuthorizationException(user + " does not have READ access to " + key);
-          }
-          InputStreamWithMeta in = blobstore.getBlob(key);
-          byte[] buffer = new byte[1024];
-          int len;
-          while ((len = in.read(buffer)) >= 0) {
-            out.write(buffer, 0, len);
-          }
-          out.close();
-          in.close();
-          if (uncompress) {
-            ServerUtils.unpack(new File(downloadFile), new File(localFileWithVersion));
-            LOG.debug("uncompressed " + downloadFile + " to: " + localFileWithVersion);
-          }
-
-          // Next write the version.
-          LOG.info("Blob: " + key + " updated with new Nimbus-provided version: " +
-              nimbusBlobVersion + " local version was: " + oldVersion);
-          // The false parameter ensures overwriting the version file, not appending
-          writer = new PrintWriter(
-              new BufferedWriter(new FileWriter(localVersionFile, false)));
-          writer.println(nimbusBlobVersion);
-          writer.close();
-
-          try {
-            setBlobPermissions(conf, user, localFileWithVersion);
-            setBlobPermissions(conf, user, localVersionFile);
-
-            // Update the key.current symlink. First create tmp symlink and do
-            // move of tmp to current so that the operation is atomic.
-            String tmp_uuid_local = java.util.UUID.randomUUID().toString();
-            LOG.debug("Creating a symlink @" + localFile + "." + tmp_uuid_local + " , " +
-                "linking to: " + localFile + "." + nimbusBlobVersion);
-            File uuid_symlink = new File(localFile + "." + tmp_uuid_local);
-
-            Files.createSymbolicLink(uuid_symlink.toPath(),
-                Paths.get(ServerUtils.constructBlobWithVersionFileName(localFile.toString(),
-                        nimbusBlobVersion)));
-            File current_symlink = new File(ServerUtils.constructBlobCurrentSymlinkName(
-                    localFile.toString()));
-            Files.move(uuid_symlink.toPath(), current_symlink.toPath(), ATOMIC_MOVE);
-          } catch (IOException e) {
-            // if we fail after writing the version file but before we move current link we need to
-            // restore the old version to the file
-            try {
-              PrintWriter restoreWriter = new PrintWriter(
-                  new BufferedWriter(new FileWriter(localVersionFile, false)));
-              restoreWriter.println(oldVersion);
-              restoreWriter.close();
-            } catch (IOException ignore) {}
-            throw e;
-          }
-
-          String oldBlobFile = localFile + "." + oldVersion;
-          try {
-            // Remove the old version. Note that if a number of processes have that file open,
-            // the OS will keep the old blob file around until they all close the handle and only
-            // then deletes it. No new process will open the old blob, since the users will open the
-            // blob through the "blob.current" symlink, which always points to the latest version of
-            // a blob. Remove the old version after the current symlink is updated as to not affect
-            // anyone trying to read it.
-            if ((oldVersion != -1) && (oldVersion != nimbusBlobVersion)) {
-              LOG.info("Removing an old blob file:" + oldBlobFile);
-              Files.delete(Paths.get(oldBlobFile));
-            }
-          } catch (IOException e) {
-            // At this point we have downloaded everything and moved symlinks.  If the remove of
-            // old fails just log an error
-            LOG.error("Exception removing old blob version: " + oldBlobFile);
-          }
-
-          break;
-        } catch (AuthorizationException ae) {
-          // we consider this non-retriable exceptions
-          if (out != null) {
-            out.close();
-          }
-          new File(downloadFile).delete();
-          throw ae;
-        } catch (IOException | KeyNotFoundException e) {
-          if (out != null) {
-            out.close();
-          }
-          if (writer != null) {
-            writer.close();
-          }
-          new File(downloadFile).delete();
-          if (uncompress) {
-            try {
-              FileUtils.deleteDirectory(new File(localFileWithVersion));
-            } catch (IOException ignore) {}
-          }
-          if (!isUpdate) {
-            // don't want to remove existing version file if its an update
-            new File(localVersionFile).delete();
-          }
-
-          if (numTries < _blobDownloadRetries) {
-            LOG.error("Failed to download blob, retrying", e);
-          } else {
-            throw e;
-          }
-        }
-      }
-      return new LocalizedResource(key, localizedPath, uncompress);
-    } finally {
-      if(blobstore != null) {
-        blobstore.shutdown();
-      }
-    }
-  }
-
-  public void setBlobPermissions(Map<String, Object> conf, String user, String path)
-      throws IOException {
-
-    if (!ObjectReader.getBoolean(conf.get(Config.SUPERVISOR_RUN_WORKER_AS_USER), false)) {
-      return;
-    }
-    String wlCommand = ObjectReader.getString(conf.get(Config.SUPERVISOR_WORKER_LAUNCHER), "");
-    if (wlCommand.isEmpty()) {
-      String stormHome = System.getProperty("storm.home");
-      wlCommand = stormHome + "/bin/worker-launcher";
-    }
-    List<String> command = new ArrayList<String>(Arrays.asList(wlCommand, user, "blob", path));
-
-    String[] commandArray = command.toArray(new String[command.size()]);
-    ShellCommandExecutor shExec = new ShellCommandExecutor(commandArray);
-    LOG.info("Setting blob permissions, command: {}", Arrays.toString(commandArray));
-
-    try {
-      shExec.execute();
-      LOG.debug("output: {}", shExec.getOutput());
-    } catch (ExitCodeException e) {
-      int exitCode = shExec.getExitCode();
-      LOG.warn("Exit code from worker-launcher is : " + exitCode, e);
-      LOG.debug("output: {}", shExec.getOutput());
-      throw new IOException("Setting blob permissions failed" +
-          " (exitCode=" + exitCode + ") with output: " + shExec.getOutput(), e);
-    }
-  }
-
-
-  public synchronized void handleCacheCleanup() {
-    LocalizedResourceRetentionSet toClean = new LocalizedResourceRetentionSet(_cacheTargetSize);
-    // need one large set of all and then clean via LRU
-    for (LocalizedResourceSet t : _userRsrc.values()) {
-      toClean.addResources(t);
-      LOG.debug("Resources to be cleaned after adding {} : {}", t.getUser(), toClean);
-    }
-    toClean.cleanup();
-    LOG.debug("Resource cleanup: {}", toClean);
-    for (LocalizedResourceSet t : _userRsrc.values()) {
-      if (t.getSize() == 0) {
-        String user = t.getUser();
-
-        LOG.debug("removing empty set: {}", user);
-        File userFileCacheDir = getLocalUserFileCacheDir(user);
-        getCacheDirForFiles(userFileCacheDir).delete();
-        getCacheDirForArchives(userFileCacheDir).delete();
-        getLocalUserFileCacheDir(user).delete();
-        boolean dirsRemoved = getLocalUserDir(user).delete();
-        // to catch race with update thread
-        if (dirsRemoved) {
-          _userRsrc.remove(user);
-        }
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/78cb243c/storm-server/src/main/java/org/apache/storm/utils/ServerUtils.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/utils/ServerUtils.java b/storm-server/src/main/java/org/apache/storm/utils/ServerUtils.java
index 311acda..7b127d2 100644
--- a/storm-server/src/main/java/org/apache/storm/utils/ServerUtils.java
+++ b/storm-server/src/main/java/org/apache/storm/utils/ServerUtils.java
@@ -33,23 +33,26 @@ import org.apache.storm.blobstore.ClientBlobStore;
 import org.apache.storm.blobstore.InputStreamWithMeta;
 import org.apache.storm.blobstore.LocalFsBlobStore;
 import org.apache.storm.daemon.StormCommon;
-import org.apache.storm.generated.*;
-import org.apache.storm.localizer.Localizer;
+import org.apache.storm.generated.AccessControl;
+import org.apache.storm.generated.AccessControlType;
+import org.apache.storm.generated.AuthorizationException;
+import org.apache.storm.generated.InvalidTopologyException;
+import org.apache.storm.generated.KeyNotFoundException;
+import org.apache.storm.generated.ReadableBlobMeta;
+import org.apache.storm.generated.SettableBlobMeta;
+import org.apache.storm.generated.StormTopology;
 import org.apache.storm.nimbus.NimbusInfo;
 import org.apache.storm.scheduler.resource.ResourceUtils;
 import org.apache.thrift.TException;
-import org.apache.storm.utils.Utils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.BufferedInputStream;
 import java.io.BufferedOutputStream;
-import java.io.BufferedReader;
 import java.io.BufferedWriter;
 import java.io.File;
 import java.io.FileInputStream;
 import java.io.FileOutputStream;
-import java.io.FileReader;
 import java.io.FileWriter;
 import java.io.IOException;
 import java.io.InputStream;
@@ -61,7 +64,6 @@ import java.nio.file.Files;
 import java.nio.file.Paths;
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Collection;
 import java.util.Collections;
 import java.util.Enumeration;
 import java.util.HashMap;
@@ -285,10 +287,6 @@ public class ServerUtils {
         return Files.getOwner(FileSystems.getDefault().getPath(path)).getName();
     }
 
-    public static Localizer createLocalizer(Map<String, Object> conf, String baseDir) {
-        return new Localizer(conf, baseDir);
-    }
-
     public static String containerFilePath (String dir) {
         return dir + FILE_PATH_SEPARATOR + "launch_container.sh";
     }

http://git-wip-us.apache.org/repos/asf/storm/blob/78cb243c/storm-server/src/test/java/org/apache/storm/daemon/supervisor/SlotTest.java
----------------------------------------------------------------------
diff --git a/storm-server/src/test/java/org/apache/storm/daemon/supervisor/SlotTest.java b/storm-server/src/test/java/org/apache/storm/daemon/supervisor/SlotTest.java
index eb25566..60b628e 100644
--- a/storm-server/src/test/java/org/apache/storm/daemon/supervisor/SlotTest.java
+++ b/storm-server/src/test/java/org/apache/storm/daemon/supervisor/SlotTest.java
@@ -25,7 +25,7 @@ import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
-import java.util.concurrent.Future;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.storm.daemon.supervisor.Slot.StaticState;
@@ -39,7 +39,7 @@ import org.apache.storm.generated.NodeInfo;
 import org.apache.storm.generated.ProfileAction;
 import org.apache.storm.generated.ProfileRequest;
 import org.apache.storm.generated.WorkerResources;
-import org.apache.storm.localizer.ILocalizer;
+import org.apache.storm.localizer.AsyncLocalizer;
 import org.apache.storm.scheduler.ISupervisor;
 import org.apache.storm.utils.LocalState;
 import org.apache.storm.utils.Time;
@@ -115,7 +115,7 @@ public class SlotTest {
     @Test
     public void testEmptyToEmpty() throws Exception {
         try (SimulatedTime t = new SimulatedTime(1010)){
-            ILocalizer localizer = mock(ILocalizer.class);
+            AsyncLocalizer localizer = mock(AsyncLocalizer.class);
             LocalState state = mock(LocalState.class);
             ContainerLauncher containerLauncher = mock(ContainerLauncher.class);
             ISupervisor iSuper = mock(ISupervisor.class);
@@ -137,7 +137,7 @@ public class SlotTest {
             LocalAssignment newAssignment = 
                     mkLocalAssignment(topoId, execList, mkWorkerResources(100.0, 100.0, 100.0));
             
-            ILocalizer localizer = mock(ILocalizer.class);
+            AsyncLocalizer localizer = mock(AsyncLocalizer.class);
             Container container = mock(Container.class);
             LocalState state = mock(LocalState.class);
             ContainerLauncher containerLauncher = mock(ContainerLauncher.class);
@@ -146,11 +146,11 @@ public class SlotTest {
             when(container.readHeartbeat()).thenReturn(hb, hb);
             
             @SuppressWarnings("unchecked")
-            Future<Void> baseFuture = mock(Future.class);
+            CompletableFuture<Void> baseFuture = mock(CompletableFuture.class);
             when(localizer.requestDownloadBaseTopologyBlobs(newAssignment, port)).thenReturn(baseFuture);
             
             @SuppressWarnings("unchecked")
-            Future<Void> blobFuture = mock(Future.class);
+            CompletableFuture<Void> blobFuture = mock(CompletableFuture.class);
             when(localizer.requestDownloadTopologyBlobs(newAssignment, port)).thenReturn(blobFuture);
             
             ISupervisor iSuper = mock(ISupervisor.class);
@@ -220,7 +220,7 @@ public class SlotTest {
             LocalAssignment assignment = 
                     mkLocalAssignment(topoId, execList, mkWorkerResources(100.0, 100.0, 100.0));
             
-            ILocalizer localizer = mock(ILocalizer.class);
+            AsyncLocalizer localizer = mock(AsyncLocalizer.class);
             Container container = mock(Container.class);
             ContainerLauncher containerLauncher = mock(ContainerLauncher.class);
             LSWorkerHeartbeat oldhb = mkWorkerHB(topoId, port, execList, Time.currentTimeSecs()-10);
@@ -276,7 +276,7 @@ public class SlotTest {
             LocalAssignment nAssignment = 
                     mkLocalAssignment(nTopoId, nExecList, mkWorkerResources(100.0, 100.0, 100.0));
             
-            ILocalizer localizer = mock(ILocalizer.class);
+            AsyncLocalizer localizer = mock(AsyncLocalizer.class);
             Container nContainer = mock(Container.class);
             LocalState state = mock(LocalState.class);
             ContainerLauncher containerLauncher = mock(ContainerLauncher.class);
@@ -285,11 +285,11 @@ public class SlotTest {
             when(nContainer.readHeartbeat()).thenReturn(nhb, nhb);
             
             @SuppressWarnings("unchecked")
-            Future<Void> baseFuture = mock(Future.class);
+            CompletableFuture<Void> baseFuture = mock(CompletableFuture.class);
             when(localizer.requestDownloadBaseTopologyBlobs(nAssignment, port)).thenReturn(baseFuture);
             
             @SuppressWarnings("unchecked")
-            Future<Void> blobFuture = mock(Future.class);
+            CompletableFuture<Void> blobFuture = mock(CompletableFuture.class);
             when(localizer.requestDownloadTopologyBlobs(nAssignment, port)).thenReturn(blobFuture);
             
             ISupervisor iSuper = mock(ISupervisor.class);
@@ -377,7 +377,7 @@ public class SlotTest {
             when(cContainer.readHeartbeat()).thenReturn(chb);
             when(cContainer.areAllProcessesDead()).thenReturn(false, true);
             
-            ILocalizer localizer = mock(ILocalizer.class);
+            AsyncLocalizer localizer = mock(AsyncLocalizer.class);
             ContainerLauncher containerLauncher = mock(ContainerLauncher.class);
             
             ISupervisor iSuper = mock(ISupervisor.class);
@@ -437,7 +437,7 @@ public class SlotTest {
             when(cContainer.readHeartbeat()).thenReturn(chb, chb, chb, chb, chb, chb);
             when(cContainer.runProfiling(any(ProfileRequest.class), anyBoolean())).thenReturn(true);
             
-            ILocalizer localizer = mock(ILocalizer.class);
+            AsyncLocalizer localizer = mock(AsyncLocalizer.class);
             ContainerLauncher containerLauncher = mock(ContainerLauncher.class);
             
             ISupervisor iSuper = mock(ISupervisor.class);

http://git-wip-us.apache.org/repos/asf/storm/blob/78cb243c/storm-server/src/test/java/org/apache/storm/localizer/AsyncLocalizerTest.java
----------------------------------------------------------------------
diff --git a/storm-server/src/test/java/org/apache/storm/localizer/AsyncLocalizerTest.java b/storm-server/src/test/java/org/apache/storm/localizer/AsyncLocalizerTest.java
index 2461b33..f49be63 100644
--- a/storm-server/src/test/java/org/apache/storm/localizer/AsyncLocalizerTest.java
+++ b/storm-server/src/test/java/org/apache/storm/localizer/AsyncLocalizerTest.java
@@ -15,24 +15,51 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.storm.localizer;
 
+import static org.apache.storm.localizer.AsyncLocalizer.USERCACHE;
+import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 import static org.mockito.Matchers.*;
 import static org.mockito.Mockito.*;
 
+import com.google.common.base.Joiner;
 import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.file.Files;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.UUID;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.IOUtils;
 import org.apache.storm.DaemonConfig;
+import org.apache.storm.blobstore.BlobStoreAclHandler;
+import org.apache.storm.blobstore.InputStreamWithMeta;
+import org.apache.storm.blobstore.LocalFsBlobStore;
 import org.apache.storm.daemon.supervisor.AdvancedFSOps;
+import org.apache.storm.generated.AccessControl;
+import org.apache.storm.generated.AccessControlType;
+import org.apache.storm.generated.AuthorizationException;
+import org.apache.storm.generated.KeyNotFoundException;
+import org.apache.storm.generated.ReadableBlobMeta;
+import org.apache.storm.generated.SettableBlobMeta;
 import org.apache.storm.utils.ConfigUtils;
 import org.apache.storm.utils.ServerUtils;
 import org.apache.storm.utils.ReflectionUtils;
+import org.apache.storm.utils.Utils;
+import org.junit.After;
+import org.junit.Before;
 import org.junit.Test;
 
 import org.apache.storm.Config;
@@ -41,9 +68,16 @@ import org.apache.storm.generated.ExecutorInfo;
 import org.apache.storm.generated.LocalAssignment;
 import org.apache.storm.generated.StormTopology;
 import org.apache.storm.security.auth.DefaultPrincipalToLocal;
+import org.mockito.Mockito;
 
 public class AsyncLocalizerTest {
 
+    private static String getTestLocalizerRoot() {
+        File f = new File("./target/" + Thread.currentThread().getStackTrace()[2].getMethodName() + "/localizer/");
+        f.deleteOnExit();
+        return f.getPath();
+    }
+
     @Test
     public void testRequestDownloadBaseTopologyBlobs() throws Exception {
         final String topoId = "TOPO";
@@ -68,7 +102,6 @@ public class AsyncLocalizerTest {
         conf.put(Config.STORM_PRINCIPAL_TO_LOCAL_PLUGIN, DefaultPrincipalToLocal.class.getName());
         conf.put(Config.STORM_CLUSTER_MODE, "distributed");
         conf.put(Config.STORM_LOCAL_DIR, stormLocal);
-        Localizer localizer = mock(Localizer.class);
         AdvancedFSOps ops = mock(AdvancedFSOps.class);
         ConfigUtils mockedCU = mock(ConfigUtils.class);
         ReflectionUtils mockedRU = mock(ReflectionUtils.class);
@@ -76,7 +109,7 @@ public class AsyncLocalizerTest {
         
         Map<String, Object> topoConf = new HashMap<>(conf);
         
-        AsyncLocalizer al = new AsyncLocalizer(conf, localizer, ops);
+        AsyncLocalizer bl = new AsyncLocalizer(conf, getTestLocalizerRoot(), ops, new AtomicReference<>(new HashMap<>()), null);
         ConfigUtils orig = ConfigUtils.setInstance(mockedCU);
         ReflectionUtils origRU = ReflectionUtils.setInstance(mockedRU);
         ServerUtils origUtils = ServerUtils.setInstance(mockedU);
@@ -86,7 +119,7 @@ public class AsyncLocalizerTest {
             when(mockedRU.newInstanceImpl(ClientBlobStore.class)).thenReturn(blobStore);
             when(mockedCU.readSupervisorStormConfImpl(conf, topoId)).thenReturn(topoConf);
 
-            Future<Void> f = al.requestDownloadBaseTopologyBlobs(la, port);
+            Future<Void> f = bl.requestDownloadBaseTopologyBlobs(la, port);
             f.get(20, TimeUnit.SECONDS);
             // We should be done now...
             
@@ -102,7 +135,7 @@ public class AsyncLocalizerTest {
             
             verify(ops, never()).deleteIfExists(any(File.class));
         } finally {
-            al.shutdown();
+            bl.close();
             ConfigUtils.setInstance(orig);
             ReflectionUtils.setInstance(origRU);
             ServerUtils.setInstance(origUtils);
@@ -129,7 +162,7 @@ public class AsyncLocalizerTest {
         final File userDir = new File(stormLocal, user);
         final String stormRoot = stormLocal+topoId+"/";
         
-        final String localizerRoot = "/tmp/storm-localizer/";
+        final String localizerRoot = getTestLocalizerRoot();
         final String simpleLocalFile = localizerRoot + user + "/simple";
         final String simpleCurrentLocalFile = localizerRoot + user + "/simple.current";
        
@@ -146,7 +179,6 @@ public class AsyncLocalizerTest {
         
         Map<String, Object> conf = new HashMap<>();
         conf.put(Config.STORM_LOCAL_DIR, stormLocal);
-        Localizer localizer = mock(Localizer.class);
         AdvancedFSOps ops = mock(AdvancedFSOps.class);
         ConfigUtils mockedCU = mock(ConfigUtils.class);
 
@@ -157,33 +189,662 @@ public class AsyncLocalizerTest {
         List<LocalizedResource> localizedList = new ArrayList<>();
         LocalizedResource simpleLocal = new LocalizedResource(simpleKey, simpleLocalFile, false);
         localizedList.add(simpleLocal);
-        
-        AsyncLocalizer al = new AsyncLocalizer(conf, localizer, ops);
+
+        AsyncLocalizer bl = spy(new AsyncLocalizer(conf, localizerRoot, ops, new AtomicReference<>(new HashMap<>()), null));
         ConfigUtils orig = ConfigUtils.setInstance(mockedCU);
         try {
             when(mockedCU.supervisorStormDistRootImpl(conf, topoId)).thenReturn(stormRoot);
             when(mockedCU.readSupervisorStormConfImpl(conf, topoId)).thenReturn(topoConf);
             when(mockedCU.readSupervisorTopologyImpl(conf, topoId, ops)).thenReturn(st);
-            
-            when(localizer.getLocalUserFileCacheDir(user)).thenReturn(userDir);
-            
-            when(localizer.getBlobs(any(List.class), eq(user), eq(topoName), eq(userDir))).thenReturn(localizedList);
-            
-            Future<Void> f = al.requestDownloadTopologyBlobs(la, port);
+
+            //Write the mocking backwards so the actual method is not called on the spy object
+            doReturn(userDir).when(bl).getLocalUserFileCacheDir(user);
+            doReturn(localizedList).when(bl).getBlobs(any(List.class), eq(user), eq(topoName), eq(userDir));
+
+            Future<Void> f = bl.requestDownloadTopologyBlobs(la, port);
             f.get(20, TimeUnit.SECONDS);
             // We should be done now...
-            
-            verify(localizer).getLocalUserFileCacheDir(user);
+
+            verify(bl).getLocalUserFileCacheDir(user);
+
             verify(ops).fileExists(userDir);
             verify(ops).forceMkdir(userDir);
-            
-            verify(localizer).getBlobs(any(List.class), eq(user), eq(topoName), eq(userDir));
+
+            verify(bl).getBlobs(any(List.class), eq(user), eq(topoName), eq(userDir));
 
             verify(ops).createSymlink(new File(stormRoot, simpleLocalName), new File(simpleCurrentLocalFile));
         } finally {
-            al.shutdown();
+            bl.close();
             ConfigUtils.setInstance(orig);
         }
     }
 
+
+    //From LocalizerTest
+    private File baseDir;
+
+    private final String user1 = "user1";
+    private final String user2 = "user2";
+    private final String user3 = "user3";
+
+    private ClientBlobStore mockblobstore = mock(ClientBlobStore.class);
+
+
+    class TestLocalizer extends AsyncLocalizer {
+
+        TestLocalizer(Map<String, Object> conf, String baseDir) throws IOException {
+            super(conf, baseDir, AdvancedFSOps.make(conf), new AtomicReference<>(new HashMap<>()), null);
+        }
+
+        @Override
+        protected ClientBlobStore getClientBlobStore() {
+            return mockblobstore;
+        }
+    }
+
+    class TestInputStreamWithMeta extends InputStreamWithMeta {
+        private InputStream iostream;
+
+        public TestInputStreamWithMeta() {
+            iostream = IOUtils.toInputStream("some test data for my input stream");
+        }
+
+        public TestInputStreamWithMeta(InputStream istream) {
+            iostream = istream;
+        }
+
+        @Override
+        public long getVersion() throws IOException {
+            return 1;
+        }
+
+        @Override
+        public synchronized int read() {
+            return 0;
+        }
+
+        @Override
+        public synchronized int read(byte[] b)
+            throws IOException {
+            int length = iostream.read(b);
+            if (length == 0) {
+                return -1;
+            }
+            return length;
+        }
+
+        @Override
+        public long getFileLength() {
+            return 0;
+        }
+    };
+
+    @Before
+    public void setUp() throws Exception {
+        baseDir = new File(System.getProperty("java.io.tmpdir") + "/blob-store-localizer-test-"+ UUID.randomUUID());
+        if (!baseDir.mkdir()) {
+            throw new IOException("failed to create base directory");
+        }
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        try {
+            FileUtils.deleteDirectory(baseDir);
+        } catch (IOException ignore) {}
+    }
+
+    protected String joinPath(String... pathList) {
+        return Joiner.on(File.separator).join(pathList);
+    }
+
+    public String constructUserCacheDir(String base, String user) {
+        return joinPath(base, USERCACHE, user);
+    }
+
+    public String constructExpectedFilesDir(String base, String user) {
+        return joinPath(constructUserCacheDir(base, user), AsyncLocalizer.FILECACHE, AsyncLocalizer.FILESDIR);
+    }
+
+    public String constructExpectedArchivesDir(String base, String user) {
+        return joinPath(constructUserCacheDir(base, user), AsyncLocalizer.FILECACHE, AsyncLocalizer.ARCHIVESDIR);
+    }
+
+    @Test
+    public void testDirPaths() throws Exception {
+        Map<String, Object> conf = new HashMap();
+        AsyncLocalizer localizer = new TestLocalizer(conf, baseDir.toString());
+
+        String expectedDir = constructUserCacheDir(baseDir.toString(), user1);
+        assertEquals("get local user dir doesn't return right value",
+            expectedDir, localizer.getLocalUserDir(user1).toString());
+
+        String expectedFileDir = joinPath(expectedDir, AsyncLocalizer.FILECACHE);
+        assertEquals("get local user file dir doesn't return right value",
+            expectedFileDir, localizer.getLocalUserFileCacheDir(user1).toString());
+    }
+
+    @Test
+    public void testReconstruct() throws Exception {
+        Map<String, Object> conf = new HashMap();
+
+        String expectedFileDir1 = constructExpectedFilesDir(baseDir.toString(), user1);
+        String expectedArchiveDir1 = constructExpectedArchivesDir(baseDir.toString(), user1);
+        String expectedFileDir2 = constructExpectedFilesDir(baseDir.toString(), user2);
+        String expectedArchiveDir2 = constructExpectedArchivesDir(baseDir.toString(), user2);
+
+        String key1 = "testfile1.txt";
+        String key2 = "testfile2.txt";
+        String key3 = "testfile3.txt";
+        String key4 = "testfile4.txt";
+
+        String archive1 = "archive1";
+        String archive2 = "archive2";
+
+        File user1file1 = new File(expectedFileDir1, key1 + ServerUtils.DEFAULT_CURRENT_BLOB_SUFFIX);
+        File user1file2 = new File(expectedFileDir1, key2 + ServerUtils.DEFAULT_CURRENT_BLOB_SUFFIX);
+        File user2file3 = new File(expectedFileDir2, key3 + ServerUtils.DEFAULT_CURRENT_BLOB_SUFFIX);
+        File user2file4 = new File(expectedFileDir2, key4 + ServerUtils.DEFAULT_CURRENT_BLOB_SUFFIX);
+
+        File user1archive1 = new File(expectedArchiveDir1, archive1 + ServerUtils.DEFAULT_CURRENT_BLOB_SUFFIX);
+        File user2archive2 = new File(expectedArchiveDir2, archive2 + ServerUtils.DEFAULT_CURRENT_BLOB_SUFFIX);
+        File user1archive1file = new File(user1archive1, "file1");
+        File user2archive2file = new File(user2archive2, "file2");
+
+        // setup some files/dirs to emulate supervisor restart
+        assertTrue("Failed setup filecache dir1", new File(expectedFileDir1).mkdirs());
+        assertTrue("Failed setup filecache dir2", new File(expectedFileDir2).mkdirs());
+        assertTrue("Failed setup file1", user1file1.createNewFile());
+        assertTrue("Failed setup file2", user1file2.createNewFile());
+        assertTrue("Failed setup file3", user2file3.createNewFile());
+        assertTrue("Failed setup file4", user2file4.createNewFile());
+        assertTrue("Failed setup archive dir1", user1archive1.mkdirs());
+        assertTrue("Failed setup archive dir2", user2archive2.mkdirs());
+        assertTrue("Failed setup file in archivedir1", user1archive1file.createNewFile());
+        assertTrue("Failed setup file in archivedir2", user2archive2file.createNewFile());
+
+        AsyncLocalizer localizer = new TestLocalizer(conf, baseDir.toString());
+
+        ArrayList<LocalResource> arrUser1Keys = new ArrayList<LocalResource>();
+        arrUser1Keys.add(new LocalResource(key1, false));
+        arrUser1Keys.add(new LocalResource(archive1, true));
+        localizer.addReferences(arrUser1Keys, user1, "topo1");
+
+        LocalizedResourceSet lrsrcSet = localizer.getUserResources().get(user1);
+        assertEquals("local resource set size wrong", 3, lrsrcSet.getSize());
+        assertEquals("user doesn't match", user1, lrsrcSet.getUser());
+        LocalizedResource key1rsrc = lrsrcSet.get(key1, false);
+        assertNotNull("Local resource doesn't exist but should", key1rsrc);
+        assertEquals("key doesn't match", key1, key1rsrc.getKey());
+        assertEquals("refcount doesn't match", 1, key1rsrc.getRefCount());
+        LocalizedResource key2rsrc = lrsrcSet.get(key2, false);
+        assertNotNull("Local resource doesn't exist but should", key2rsrc);
+        assertEquals("key doesn't match", key2, key2rsrc.getKey());
+        assertEquals("refcount doesn't match", 0, key2rsrc.getRefCount());
+        LocalizedResource archive1rsrc = lrsrcSet.get(archive1, true);
+        assertNotNull("Local resource doesn't exist but should", archive1rsrc);
+        assertEquals("key doesn't match", archive1, archive1rsrc.getKey());
+        assertEquals("refcount doesn't match", 1, archive1rsrc.getRefCount());
+
+        LocalizedResourceSet lrsrcSet2 = localizer.getUserResources().get(user2);
+        assertEquals("local resource set size wrong", 3, lrsrcSet2.getSize());
+        assertEquals("user doesn't match", user2, lrsrcSet2.getUser());
+        LocalizedResource key3rsrc = lrsrcSet2.get(key3, false);
+        assertNotNull("Local resource doesn't exist but should", key3rsrc);
+        assertEquals("key doesn't match", key3, key3rsrc.getKey());
+        assertEquals("refcount doesn't match", 0, key3rsrc.getRefCount());
+        LocalizedResource key4rsrc = lrsrcSet2.get(key4, false);
+        assertNotNull("Local resource doesn't exist but should", key4rsrc);
+        assertEquals("key doesn't match", key4, key4rsrc.getKey());
+        assertEquals("refcount doesn't match", 0, key4rsrc.getRefCount());
+        LocalizedResource archive2rsrc = lrsrcSet2.get(archive2, true);
+        assertNotNull("Local resource doesn't exist but should", archive2rsrc);
+        assertEquals("key doesn't match", archive2, archive2rsrc.getKey());
+        assertEquals("refcount doesn't match", 0, archive2rsrc.getRefCount());
+    }
+
+    @Test
+    public void testArchivesTgz() throws Exception {
+        testArchives(getFileFromResource(joinPath("localizer", "localtestwithsymlink.tgz")), true, 21344);
+    }
+
+    @Test
+    public void testArchivesZip() throws Exception {
+        testArchives(getFileFromResource(joinPath("localizer", "localtest.zip")), false, 21348);
+    }
+
+    @Test
+    public void testArchivesTarGz() throws Exception {
+        testArchives(getFileFromResource(joinPath("localizer", "localtestwithsymlink.tar.gz")), true, 21344);
+    }
+
+    @Test
+    public void testArchivesTar() throws Exception {
+        testArchives(getFileFromResource(joinPath("localizer", "localtestwithsymlink.tar")), true, 21344);
+    }
+
+    @Test
+    public void testArchivesJar() throws Exception {
+        testArchives(getFileFromResource(joinPath("localizer", "localtestwithsymlink.jar")), false, 21416);
+    }
+
+    private File getFileFromResource(String archivePath) {
+        ClassLoader classLoader = getClass().getClassLoader();
+        return new File(classLoader.getResource(archivePath).getFile());
+    }
+
+    // archive passed in must contain symlink named tmptestsymlink if not a zip file
+    public void testArchives(File archiveFile, boolean supportSymlinks, int size) throws Exception {
+        if (Utils.isOnWindows()) {
+            // Windows should set this to false cause symlink in compressed file doesn't work properly.
+            supportSymlinks = false;
+        }
+
+        Map<String, Object> conf = new HashMap();
+        // set clean time really high so doesn't kick in
+        conf.put(DaemonConfig.SUPERVISOR_LOCALIZER_CACHE_CLEANUP_INTERVAL_MS, 60*60*1000);
+
+        String key1 = archiveFile.getName();
+        String topo1 = "topo1";
+        AsyncLocalizer localizer = new TestLocalizer(conf, baseDir.toString());
+        // set really small so will do cleanup
+        localizer.setTargetCacheSize(1);
+
+        ReadableBlobMeta rbm = new ReadableBlobMeta();
+        rbm.set_settable(new SettableBlobMeta(BlobStoreAclHandler.WORLD_EVERYTHING));
+        when(mockblobstore.getBlobMeta(key1)).thenReturn(rbm);
+
+        when(mockblobstore.getBlob(key1)).thenReturn(new TestInputStreamWithMeta(new
+            FileInputStream(archiveFile.getAbsolutePath())));
+
+        long timeBefore = System.nanoTime();
+        File user1Dir = localizer.getLocalUserFileCacheDir(user1);
+        assertTrue("failed to create user dir", user1Dir.mkdirs());
+        LocalizedResource lrsrc = localizer.getBlob(new LocalResource(key1, true), user1, topo1,
+            user1Dir);
+        long timeAfter = System.nanoTime();
+
+        String expectedUserDir = joinPath(baseDir.toString(), USERCACHE, user1);
+        String expectedFileDir = joinPath(expectedUserDir, AsyncLocalizer.FILECACHE, AsyncLocalizer.ARCHIVESDIR);
+        assertTrue("user filecache dir not created", new File(expectedFileDir).exists());
+        File keyFile = new File(expectedFileDir, key1 + ".0");
+        assertTrue("blob not created", keyFile.exists());
+        assertTrue("blob is not uncompressed", keyFile.isDirectory());
+        File symlinkFile = new File(keyFile, "tmptestsymlink");
+
+        if (supportSymlinks) {
+            assertTrue("blob uncompressed doesn't contain symlink", Files.isSymbolicLink(
+                symlinkFile.toPath()));
+        } else {
+            assertTrue("blob symlink file doesn't exist", symlinkFile.exists());
+        }
+
+        LocalizedResourceSet lrsrcSet = localizer.getUserResources().get(user1);
+        assertEquals("local resource set size wrong", 1, lrsrcSet.getSize());
+        assertEquals("user doesn't match", user1, lrsrcSet.getUser());
+        LocalizedResource key1rsrc = lrsrcSet.get(key1, true);
+        assertNotNull("Local resource doesn't exist but should", key1rsrc);
+        assertEquals("key doesn't match", key1, key1rsrc.getKey());
+        assertEquals("refcount doesn't match", 1, key1rsrc.getRefCount());
+        assertEquals("file path doesn't match", keyFile.toString(), key1rsrc.getFilePathWithVersion());
+        assertEquals("size doesn't match", size, key1rsrc.getSize());
+        assertTrue("timestamp not within range", (key1rsrc.getLastAccessTime() >= timeBefore && key1rsrc
+            .getLastAccessTime() <= timeAfter));
+
+        timeBefore = System.nanoTime();
+        localizer.removeBlobReference(lrsrc.getKey(), user1, topo1, true);
+        timeAfter = System.nanoTime();
+
+        lrsrcSet = localizer.getUserResources().get(user1);
+        assertEquals("local resource set size wrong", 1, lrsrcSet.getSize());
+        key1rsrc = lrsrcSet.get(key1, true);
+        assertNotNull("Local resource doesn't exist but should", key1rsrc);
+        assertEquals("refcount doesn't match", 0, key1rsrc.getRefCount());
+        assertTrue("timestamp not within range", (key1rsrc.getLastAccessTime() >= timeBefore && key1rsrc
+            .getLastAccessTime() <= timeAfter));
+
+        // should remove the blob since cache size set really small
+        localizer.cleanup();
+
+        lrsrcSet = localizer.getUserResources().get(user1);
+        assertFalse("blob contents not deleted", symlinkFile.exists());
+        assertFalse("blob not deleted", keyFile.exists());
+        assertFalse("blob file dir not deleted", new File(expectedFileDir).exists());
+        assertFalse("blob dir not deleted", new File(expectedUserDir).exists());
+        assertNull("user set should be null", lrsrcSet);
+
+    }
+
+    @Test
+    public void testBasic() throws Exception {
+        Map<String, Object> conf = new HashMap();
+        // set clean time really high so doesn't kick in
+        conf.put(DaemonConfig.SUPERVISOR_LOCALIZER_CACHE_CLEANUP_INTERVAL_MS, 60*60*1000);
+
+        String key1 = "key1";
+        String topo1 = "topo1";
+        AsyncLocalizer localizer = new TestLocalizer(conf, baseDir.toString());
+        // set really small so will do cleanup
+        localizer.setTargetCacheSize(1);
+
+        ReadableBlobMeta rbm = new ReadableBlobMeta();
+        rbm.set_settable(new SettableBlobMeta(BlobStoreAclHandler.WORLD_EVERYTHING));
+        when(mockblobstore.getBlobMeta(key1)).thenReturn(rbm);
+
+        when(mockblobstore.getBlob(key1)).thenReturn(new TestInputStreamWithMeta());
+
+        long timeBefore = System.nanoTime();
+        File user1Dir = localizer.getLocalUserFileCacheDir(user1);
+        assertTrue("failed to create user dir", user1Dir.mkdirs());
+        LocalizedResource lrsrc = localizer.getBlob(new LocalResource(key1, false), user1, topo1,
+            user1Dir);
+        long timeAfter = System.nanoTime();
+
+        String expectedUserDir = joinPath(baseDir.toString(), USERCACHE, user1);
+        String expectedFileDir = joinPath(expectedUserDir, AsyncLocalizer.FILECACHE, AsyncLocalizer.FILESDIR);
+        assertTrue("user filecache dir not created", new File(expectedFileDir).exists());
+        File keyFile = new File(expectedFileDir, key1);
+        File keyFileCurrentSymlink = new File(expectedFileDir, key1 + ServerUtils.DEFAULT_CURRENT_BLOB_SUFFIX);
+
+        assertTrue("blob not created", keyFileCurrentSymlink.exists());
+
+        LocalizedResourceSet lrsrcSet = localizer.getUserResources().get(user1);
+        assertEquals("local resource set size wrong", 1, lrsrcSet.getSize());
+        assertEquals("user doesn't match", user1, lrsrcSet.getUser());
+        LocalizedResource key1rsrc = lrsrcSet.get(key1, false);
+        assertNotNull("Local resource doesn't exist but should", key1rsrc);
+        assertEquals("key doesn't match", key1, key1rsrc.getKey());
+        assertEquals("refcount doesn't match", 1, key1rsrc.getRefCount());
+        assertEquals("file path doesn't match", keyFile.toString(), key1rsrc.getFilePath());
+        assertEquals("size doesn't match", 34, key1rsrc.getSize());
+        assertTrue("timestamp not within range", (key1rsrc.getLastAccessTime() >= timeBefore && key1rsrc
+            .getLastAccessTime() <= timeAfter));
+
+        timeBefore = System.nanoTime();
+        localizer.removeBlobReference(lrsrc.getKey(), user1, topo1, false);
+        timeAfter = System.nanoTime();
+
+        lrsrcSet = localizer.getUserResources().get(user1);
+        assertEquals("local resource set size wrong", 1, lrsrcSet.getSize());
+        key1rsrc = lrsrcSet.get(key1, false);
+        assertNotNull("Local resource doesn't exist but should", key1rsrc);
+        assertEquals("refcount doesn't match", 0, key1rsrc.getRefCount());
+        assertTrue("timestamp not within range", (key1rsrc.getLastAccessTime() >= timeBefore && key1rsrc
+            .getLastAccessTime() <= timeAfter));
+
+        // should remove the blob since cache size set really small
+        localizer.cleanup();
+
+        lrsrcSet = localizer.getUserResources().get(user1);
+        assertNull("user set should be null", lrsrcSet);
+        assertFalse("blob not deleted", keyFile.exists());
+        assertFalse("blob dir not deleted", new File(expectedFileDir).exists());
+        assertFalse("blob dir not deleted", new File(expectedUserDir).exists());
+    }
+
+    @Test
+    public void testMultipleKeysOneUser() throws Exception {
+        Map<String, Object> conf = new HashMap();
+        // set clean time really high so doesn't kick in
+        conf.put(DaemonConfig.SUPERVISOR_LOCALIZER_CACHE_CLEANUP_INTERVAL_MS, 60*60*1000);
+
+        String key1 = "key1";
+        String topo1 = "topo1";
+        String key2 = "key2";
+        String key3 = "key3";
+        AsyncLocalizer localizer = new TestLocalizer(conf, baseDir.toString());
+        // set to keep 2 blobs (each of size 34)
+        localizer.setTargetCacheSize(68);
+
+        ReadableBlobMeta rbm = new ReadableBlobMeta();
+        rbm.set_settable(new SettableBlobMeta(BlobStoreAclHandler.WORLD_EVERYTHING));
+        when(mockblobstore.getBlobMeta(anyString())).thenReturn(rbm);
+        when(mockblobstore.getBlob(key1)).thenReturn(new TestInputStreamWithMeta());
+        when(mockblobstore.getBlob(key2)).thenReturn(new TestInputStreamWithMeta());
+        when(mockblobstore.getBlob(key3)).thenReturn(new TestInputStreamWithMeta());
+
+        List<LocalResource> keys = Arrays.asList(new LocalResource[]{new LocalResource(key1, false),
+            new LocalResource(key2, false), new LocalResource(key3, false)});
+        File user1Dir = localizer.getLocalUserFileCacheDir(user1);
+        assertTrue("failed to create user dir", user1Dir.mkdirs());
+
+        List<LocalizedResource> lrsrcs = localizer.getBlobs(keys, user1, topo1, user1Dir);
+        LocalizedResource lrsrc = lrsrcs.get(0);
+        LocalizedResource lrsrc2 = lrsrcs.get(1);
+        LocalizedResource lrsrc3 = lrsrcs.get(2);
+
+        String expectedFileDir = joinPath(baseDir.toString(), USERCACHE, user1,
+            AsyncLocalizer.FILECACHE, AsyncLocalizer.FILESDIR);
+        assertTrue("user filecache dir not created", new File(expectedFileDir).exists());
+        File keyFile = new File(expectedFileDir, key1 + ServerUtils.DEFAULT_CURRENT_BLOB_SUFFIX);
+        File keyFile2 = new File(expectedFileDir, key2 + ServerUtils.DEFAULT_CURRENT_BLOB_SUFFIX);
+        File keyFile3 = new File(expectedFileDir, key3 + ServerUtils.DEFAULT_CURRENT_BLOB_SUFFIX);
+
+        assertTrue("blob not created", keyFile.exists());
+        assertTrue("blob not created", keyFile2.exists());
+        assertTrue("blob not created", keyFile3.exists());
+        assertEquals("size doesn't match", 34, keyFile.length());
+        assertEquals("size doesn't match", 34, keyFile2.length());
+        assertEquals("size doesn't match", 34, keyFile3.length());
+        assertEquals("size doesn't match", 34, lrsrc.getSize());
+        assertEquals("size doesn't match", 34, lrsrc3.getSize());
+        assertEquals("size doesn't match", 34, lrsrc2.getSize());
+
+        LocalizedResourceSet lrsrcSet = localizer.getUserResources().get(user1);
+        assertEquals("local resource set size wrong", 3, lrsrcSet.getSize());
+        assertEquals("user doesn't match", user1, lrsrcSet.getUser());
+
+        long timeBefore = System.nanoTime();
+        localizer.removeBlobReference(lrsrc.getKey(), user1, topo1, false);
+        localizer.removeBlobReference(lrsrc2.getKey(), user1, topo1, false);
+        localizer.removeBlobReference(lrsrc3.getKey(), user1, topo1, false);
+        long timeAfter = System.nanoTime();
+
+        // add reference to one and then remove reference again so it has newer timestamp
+        lrsrc = localizer.getBlob(new LocalResource(key1, false), user1, topo1, user1Dir);
+        assertTrue("timestamp not within range", (lrsrc.getLastAccessTime() >= timeBefore && lrsrc
+            .getLastAccessTime() <= timeAfter));
+        localizer.removeBlobReference(lrsrc.getKey(), user1, topo1, false);
+
+        // should remove the second blob first
+        localizer.cleanup();
+
+        lrsrcSet = localizer.getUserResources().get(user1);
+        assertEquals("local resource set size wrong", 2, lrsrcSet.getSize());
+        long end = System.currentTimeMillis() + 100;
+        while ((end - System.currentTimeMillis()) >= 0 && keyFile2.exists()) {
+            Thread.sleep(1);
+        }
+        assertFalse("blob not deleted", keyFile2.exists());
+        assertTrue("blob deleted", keyFile.exists());
+        assertTrue("blob deleted", keyFile3.exists());
+
+        // set size to cleanup another one
+        localizer.setTargetCacheSize(34);
+
+        // should remove the third blob
+        localizer.cleanup();
+
+        lrsrcSet = localizer.getUserResources().get(user1);
+        assertEquals("local resource set size wrong", 1, lrsrcSet.getSize());
+        assertTrue("blob deleted", keyFile.exists());
+        assertFalse("blob not deleted", keyFile3.exists());
+    }
+
+    @Test(expected = AuthorizationException.class)
+    public void testFailAcls() throws Exception {
+        Map<String, Object> conf = new HashMap();
+        // set clean time really high so doesn't kick in
+        conf.put(DaemonConfig.SUPERVISOR_LOCALIZER_CACHE_CLEANUP_INTERVAL_MS, 60 * 60 * 1000);
+        // enable blobstore acl validation
+        conf.put(Config.STORM_BLOBSTORE_ACL_VALIDATION_ENABLED, true);
+
+        String topo1 = "topo1";
+        String key1 = "key1";
+        AsyncLocalizer localizer = new TestLocalizer(conf, baseDir.toString());
+
+        ReadableBlobMeta rbm = new ReadableBlobMeta();
+        // set acl so user doesn't have read access
+        AccessControl acl = new AccessControl(AccessControlType.USER, BlobStoreAclHandler.ADMIN);
+        acl.set_name(user1);
+        rbm.set_settable(new SettableBlobMeta(Arrays.asList(acl)));
+        when(mockblobstore.getBlobMeta(anyString())).thenReturn(rbm);
+        when(mockblobstore.getBlob(key1)).thenReturn(new TestInputStreamWithMeta());
+        File user1Dir = localizer.getLocalUserFileCacheDir(user1);
+        assertTrue("failed to create user dir", user1Dir.mkdirs());
+
+        // This should throw AuthorizationException because auth failed
+        localizer.getBlob(new LocalResource(key1, false), user1, topo1, user1Dir);
+    }
+
+    @Test(expected = KeyNotFoundException.class)
+    public void testKeyNotFoundException() throws Exception {
+        Map<String, Object> conf = Utils.readStormConfig();
+        String key1 = "key1";
+        conf.put(Config.STORM_LOCAL_DIR, "target");
+        LocalFsBlobStore bs = new LocalFsBlobStore();
+        LocalFsBlobStore spy = spy(bs);
+        Mockito.doReturn(true).when(spy).checkForBlobOrDownload(key1);
+        Mockito.doNothing().when(spy).checkForBlobUpdate(key1);
+        spy.prepare(conf,null,null);
+        spy.getBlob(key1, null);
+    }
+
+    @Test
+    public void testMultipleUsers() throws Exception {
+        Map<String, Object> conf = new HashMap();
+        // set clean time really high so doesn't kick in
+        conf.put(DaemonConfig.SUPERVISOR_LOCALIZER_CACHE_CLEANUP_INTERVAL_MS, 60*60*1000);
+
+        String topo1 = "topo1";
+        String topo2 = "topo2";
+        String topo3 = "topo3";
+        String key1 = "key1";
+        String key2 = "key2";
+        String key3 = "key3";
+        AsyncLocalizer localizer = new TestLocalizer(conf, baseDir.toString());
+        // set to keep 2 blobs (each of size 34)
+        localizer.setTargetCacheSize(68);
+
+        ReadableBlobMeta rbm = new ReadableBlobMeta();
+        rbm.set_settable(new SettableBlobMeta(BlobStoreAclHandler.WORLD_EVERYTHING));
+        when(mockblobstore.getBlobMeta(anyString())).thenReturn(rbm);
+        when(mockblobstore.getBlob(key1)).thenReturn(new TestInputStreamWithMeta());
+        when(mockblobstore.getBlob(key2)).thenReturn(new TestInputStreamWithMeta());
+        when(mockblobstore.getBlob(key3)).thenReturn(new TestInputStreamWithMeta());
+
+        File user1Dir = localizer.getLocalUserFileCacheDir(user1);
+        assertTrue("failed to create user dir", user1Dir.mkdirs());
+        File user2Dir = localizer.getLocalUserFileCacheDir(user2);
+        assertTrue("failed to create user dir", user2Dir.mkdirs());
+        File user3Dir = localizer.getLocalUserFileCacheDir(user3);
+        assertTrue("failed to create user dir", user3Dir.mkdirs());
+
+        LocalizedResource lrsrc = localizer.getBlob(new LocalResource(key1, false), user1, topo1,
+            user1Dir);
+        LocalizedResource lrsrc2 = localizer.getBlob(new LocalResource(key2, false), user2, topo2,
+            user2Dir);
+        LocalizedResource lrsrc3 = localizer.getBlob(new LocalResource(key3, false), user3, topo3,
+            user3Dir);
+        // make sure we support different user reading same blob
+        LocalizedResource lrsrc1_user3 = localizer.getBlob(new LocalResource(key1, false), user3,
+            topo3, user3Dir);
+
+        String expectedUserDir1 = joinPath(baseDir.toString(), USERCACHE, user1);
+        String expectedFileDirUser1 = joinPath(expectedUserDir1, AsyncLocalizer.FILECACHE, AsyncLocalizer.FILESDIR);
+        String expectedFileDirUser2 = joinPath(baseDir.toString(), USERCACHE, user2,
+            AsyncLocalizer.FILECACHE, AsyncLocalizer.FILESDIR);
+        String expectedFileDirUser3 = joinPath(baseDir.toString(), USERCACHE, user3,
+            AsyncLocalizer.FILECACHE, AsyncLocalizer.FILESDIR);
+        assertTrue("user filecache dir user1 not created", new File(expectedFileDirUser1).exists());
+        assertTrue("user filecache dir user2 not created", new File(expectedFileDirUser2).exists());
+        assertTrue("user filecache dir user3 not created", new File(expectedFileDirUser3).exists());
+
+        File keyFile = new File(expectedFileDirUser1, key1 + ServerUtils.DEFAULT_CURRENT_BLOB_SUFFIX);
+        File keyFile2 = new File(expectedFileDirUser2, key2 + ServerUtils.DEFAULT_CURRENT_BLOB_SUFFIX);
+        File keyFile3 = new File(expectedFileDirUser3, key3 + ServerUtils.DEFAULT_CURRENT_BLOB_SUFFIX);
+        File keyFile1user3 = new File(expectedFileDirUser3, key1 + ServerUtils.DEFAULT_CURRENT_BLOB_SUFFIX);
+
+        assertTrue("blob not created", keyFile.exists());
+        assertTrue("blob not created", keyFile2.exists());
+        assertTrue("blob not created", keyFile3.exists());
+        assertTrue("blob not created", keyFile1user3.exists());
+
+        LocalizedResourceSet lrsrcSet = localizer.getUserResources().get(user1);
+        assertEquals("local resource set size wrong", 1, lrsrcSet.getSize());
+        LocalizedResourceSet lrsrcSet2 = localizer.getUserResources().get(user2);
+        assertEquals("local resource set size wrong", 1, lrsrcSet2.getSize());
+        LocalizedResourceSet lrsrcSet3 = localizer.getUserResources().get(user3);
+        assertEquals("local resource set size wrong", 2, lrsrcSet3.getSize());
+
+        localizer.removeBlobReference(lrsrc.getKey(), user1, topo1, false);
+        // should remove key1
+        localizer.cleanup();
+
+        lrsrcSet = localizer.getUserResources().get(user1);
+        lrsrcSet3 = localizer.getUserResources().get(user3);
+        assertNull("user set should be null", lrsrcSet);
+        assertFalse("blob dir not deleted", new File(expectedFileDirUser1).exists());
+        assertFalse("blob dir not deleted", new File(expectedUserDir1).exists());
+        assertEquals("local resource set size wrong", 2, lrsrcSet3.getSize());
+
+        assertTrue("blob deleted", keyFile2.exists());
+        assertFalse("blob not deleted", keyFile.exists());
+        assertTrue("blob deleted", keyFile3.exists());
+        assertTrue("blob deleted", keyFile1user3.exists());
+    }
+
+    @Test
+    public void testUpdate() throws Exception {
+        Map<String, Object> conf = new HashMap();
+        // set clean time really high so doesn't kick in
+        conf.put(DaemonConfig.SUPERVISOR_LOCALIZER_CACHE_CLEANUP_INTERVAL_MS, 60*60*1000);
+
+        String key1 = "key1";
+        String topo1 = "topo1";
+        String topo2 = "topo2";
+        AsyncLocalizer localizer = new TestLocalizer(conf, baseDir.toString());
+
+        ReadableBlobMeta rbm = new ReadableBlobMeta();
+        rbm.set_version(1);
+        rbm.set_settable(new SettableBlobMeta(BlobStoreAclHandler.WORLD_EVERYTHING));
+        when(mockblobstore.getBlobMeta(key1)).thenReturn(rbm);
+        when(mockblobstore.getBlob(key1)).thenReturn(new TestInputStreamWithMeta());
+
+        File user1Dir = localizer.getLocalUserFileCacheDir(user1);
+        assertTrue("failed to create user dir", user1Dir.mkdirs());
+        LocalizedResource lrsrc = localizer.getBlob(new LocalResource(key1, false), user1, topo1,
+            user1Dir);
+
+        String expectedUserDir = joinPath(baseDir.toString(), USERCACHE, user1);
+        String expectedFileDir = joinPath(expectedUserDir, AsyncLocalizer.FILECACHE, AsyncLocalizer.FILESDIR);
+        assertTrue("user filecache dir not created", new File(expectedFileDir).exists());
+        File keyFile = new File(expectedFileDir, key1);
+        File keyFileCurrentSymlink = new File(expectedFileDir, key1 + ServerUtils.DEFAULT_CURRENT_BLOB_SUFFIX);
+        assertTrue("blob not created", keyFileCurrentSymlink.exists());
+        File versionFile = new File(expectedFileDir, key1 + ServerUtils.DEFAULT_BLOB_VERSION_SUFFIX);
+        assertTrue("blob version file not created", versionFile.exists());
+        assertEquals("blob version not correct", 1, ServerUtils.localVersionOfBlob(keyFile.toString()));
+
+        LocalizedResourceSet lrsrcSet = localizer.getUserResources().get(user1);
+        assertEquals("local resource set size wrong", 1, lrsrcSet.getSize());
+
+        // test another topology getting blob with updated version - it should update version now
+        rbm.set_version(2);
+
+        localizer.getBlob(new LocalResource(key1, false), user1, topo2, user1Dir);
+        assertTrue("blob version file not created", versionFile.exists());
+        assertEquals("blob version not correct", 2, ServerUtils.localVersionOfBlob(keyFile.toString()));
+        assertTrue("blob file with version 2 not created", new File(keyFile + ".2").exists());
+
+        // now test regular updateBlob
+        rbm.set_version(3);
+
+        ArrayList<LocalResource> arr = new ArrayList<LocalResource>();
+        arr.add(new LocalResource(key1, false));
+        localizer.updateBlobs(arr, user1);
+        assertTrue("blob version file not created", versionFile.exists());
+        assertEquals("blob version not correct", 3, ServerUtils.localVersionOfBlob(keyFile.toString()));
+        assertTrue("blob file with version 3 not created", new File(keyFile + ".3").exists());
+    }
 }


[7/9] storm git commit: Merge branch 'patch-1' of https://github.com/Ethanlm/storm into PR-2346

Posted by ka...@apache.org.
Merge branch 'patch-1' of https://github.com/Ethanlm/storm into PR-2346


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/ddea2ac7
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/ddea2ac7
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/ddea2ac7

Branch: refs/heads/master
Commit: ddea2ac7e31955a6bff0e77bb08550f870af3ce0
Parents: b5be1d6 b1f98c5
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Thu Sep 28 16:17:48 2017 +0900
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Thu Sep 28 16:17:48 2017 +0900

----------------------------------------------------------------------
 .../java/org/apache/storm/daemon/logviewer/LogviewerServer.java    | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------