You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ka...@apache.org on 2017/09/28 07:21:35 UTC
[1/9] storm git commit: STORM-2084: Refactor localization to combine
files together
Repository: storm
Updated Branches:
refs/heads/master b5be1d6b0 -> 5c05bb74c
http://git-wip-us.apache.org/repos/asf/storm/blob/78cb243c/storm-server/src/test/java/org/apache/storm/localizer/LocalizerTest.java
----------------------------------------------------------------------
diff --git a/storm-server/src/test/java/org/apache/storm/localizer/LocalizerTest.java b/storm-server/src/test/java/org/apache/storm/localizer/LocalizerTest.java
deleted file mode 100644
index 70e8e9d..0000000
--- a/storm-server/src/test/java/org/apache/storm/localizer/LocalizerTest.java
+++ /dev/null
@@ -1,682 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.localizer;
-
-import org.apache.storm.Config;
-import org.apache.storm.DaemonConfig;
-import org.apache.storm.blobstore.BlobStoreAclHandler;
-import org.apache.storm.blobstore.ClientBlobStore;
-import org.apache.storm.blobstore.InputStreamWithMeta;
-import org.apache.storm.blobstore.LocalFsBlobStore;
-import org.apache.storm.generated.AccessControl;
-import org.apache.storm.generated.AccessControlType;
-import org.apache.storm.generated.AuthorizationException;
-import org.apache.storm.generated.KeyNotFoundException;
-import org.apache.storm.generated.ReadableBlobMeta;
-import org.apache.storm.generated.SettableBlobMeta;
-import org.apache.storm.utils.ServerUtils;
-import org.apache.storm.utils.Utils;
-import org.apache.commons.io.FileUtils;
-import org.apache.commons.io.IOUtils;
-import com.google.common.base.Joiner;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-import org.mockito.Mockito;
-
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.nio.file.Files;
-import java.util.*;
-
-import static org.apache.storm.localizer.Localizer.USERCACHE;
-import static org.junit.Assert.*;
-import static org.mockito.Mockito.*;
-
-
-public class LocalizerTest {
-
- private File baseDir;
-
- private final String user1 = "user1";
- private final String user2 = "user2";
- private final String user3 = "user3";
-
- private ClientBlobStore mockblobstore = mock(ClientBlobStore.class);
-
-
- class TestLocalizer extends Localizer {
-
- TestLocalizer(Map<String, Object> conf, String baseDir) {
- super(conf, baseDir);
- }
-
- @Override
- protected ClientBlobStore getClientBlobStore() {
- return mockblobstore;
- }
- }
-
- class TestInputStreamWithMeta extends InputStreamWithMeta {
- private InputStream iostream;
-
- public TestInputStreamWithMeta() {
- iostream = IOUtils.toInputStream("some test data for my input stream");
- }
-
- public TestInputStreamWithMeta(InputStream istream) {
- iostream = istream;
- }
-
- @Override
- public long getVersion() throws IOException {
- return 1;
- }
-
- @Override
- public synchronized int read() {
- return 0;
- }
-
- @Override
- public synchronized int read(byte[] b)
- throws IOException {
- int length = iostream.read(b);
- if (length == 0) {
- return -1;
- }
- return length;
- }
-
- @Override
- public long getFileLength() {
- return 0;
- }
- };
-
- @Before
- public void setUp() throws Exception {
- baseDir = new File(System.getProperty("java.io.tmpdir") + "/blob-store-localizer-test-"+ UUID.randomUUID());
- if (!baseDir.mkdir()) {
- throw new IOException("failed to create base directory");
- }
- }
-
- @After
- public void tearDown() throws Exception {
- try {
- FileUtils.deleteDirectory(baseDir);
- } catch (IOException ignore) {}
- }
-
- protected String joinPath(String... pathList) {
- return Joiner.on(File.separator).join(pathList);
- }
-
- public String constructUserCacheDir(String base, String user) {
- return joinPath(base, USERCACHE, user);
- }
-
- public String constructExpectedFilesDir(String base, String user) {
- return joinPath(constructUserCacheDir(base, user), Localizer.FILECACHE, Localizer.FILESDIR);
- }
-
- public String constructExpectedArchivesDir(String base, String user) {
- return joinPath(constructUserCacheDir(base, user), Localizer.FILECACHE, Localizer.ARCHIVESDIR);
- }
-
- @Test
- public void testDirPaths() throws Exception {
- Map<String, Object> conf = new HashMap();
- Localizer localizer = new TestLocalizer(conf, baseDir.toString());
-
- String expectedDir = constructUserCacheDir(baseDir.toString(), user1);
- assertEquals("get local user dir doesn't return right value",
- expectedDir, localizer.getLocalUserDir(user1).toString());
-
- String expectedFileDir = joinPath(expectedDir, Localizer.FILECACHE);
- assertEquals("get local user file dir doesn't return right value",
- expectedFileDir, localizer.getLocalUserFileCacheDir(user1).toString());
- }
-
- @Test
- public void testReconstruct() throws Exception {
- Map<String, Object> conf = new HashMap();
-
- String expectedFileDir1 = constructExpectedFilesDir(baseDir.toString(), user1);
- String expectedArchiveDir1 = constructExpectedArchivesDir(baseDir.toString(), user1);
- String expectedFileDir2 = constructExpectedFilesDir(baseDir.toString(), user2);
- String expectedArchiveDir2 = constructExpectedArchivesDir(baseDir.toString(), user2);
-
- String key1 = "testfile1.txt";
- String key2 = "testfile2.txt";
- String key3 = "testfile3.txt";
- String key4 = "testfile4.txt";
-
- String archive1 = "archive1";
- String archive2 = "archive2";
-
- File user1file1 = new File(expectedFileDir1, key1 + ServerUtils.DEFAULT_CURRENT_BLOB_SUFFIX);
- File user1file2 = new File(expectedFileDir1, key2 + ServerUtils.DEFAULT_CURRENT_BLOB_SUFFIX);
- File user2file3 = new File(expectedFileDir2, key3 + ServerUtils.DEFAULT_CURRENT_BLOB_SUFFIX);
- File user2file4 = new File(expectedFileDir2, key4 + ServerUtils.DEFAULT_CURRENT_BLOB_SUFFIX);
-
- File user1archive1 = new File(expectedArchiveDir1, archive1 + ServerUtils.DEFAULT_CURRENT_BLOB_SUFFIX);
- File user2archive2 = new File(expectedArchiveDir2, archive2 + ServerUtils.DEFAULT_CURRENT_BLOB_SUFFIX);
- File user1archive1file = new File(user1archive1, "file1");
- File user2archive2file = new File(user2archive2, "file2");
-
- // setup some files/dirs to emulate supervisor restart
- assertTrue("Failed setup filecache dir1", new File(expectedFileDir1).mkdirs());
- assertTrue("Failed setup filecache dir2", new File(expectedFileDir2).mkdirs());
- assertTrue("Failed setup file1", user1file1.createNewFile());
- assertTrue("Failed setup file2", user1file2.createNewFile());
- assertTrue("Failed setup file3", user2file3.createNewFile());
- assertTrue("Failed setup file4", user2file4.createNewFile());
- assertTrue("Failed setup archive dir1", user1archive1.mkdirs());
- assertTrue("Failed setup archive dir2", user2archive2.mkdirs());
- assertTrue("Failed setup file in archivedir1", user1archive1file.createNewFile());
- assertTrue("Failed setup file in archivedir2", user2archive2file.createNewFile());
-
- Localizer localizer = new TestLocalizer(conf, baseDir.toString());
-
- ArrayList<LocalResource> arrUser1Keys = new ArrayList<LocalResource>();
- arrUser1Keys.add(new LocalResource(key1, false));
- arrUser1Keys.add(new LocalResource(archive1, true));
- localizer.addReferences(arrUser1Keys, user1, "topo1");
-
- LocalizedResourceSet lrsrcSet = localizer.getUserResources().get(user1);
- assertEquals("local resource set size wrong", 3, lrsrcSet.getSize());
- assertEquals("user doesn't match", user1, lrsrcSet.getUser());
- LocalizedResource key1rsrc = lrsrcSet.get(key1, false);
- assertNotNull("Local resource doesn't exist but should", key1rsrc);
- assertEquals("key doesn't match", key1, key1rsrc.getKey());
- assertEquals("refcount doesn't match", 1, key1rsrc.getRefCount());
- LocalizedResource key2rsrc = lrsrcSet.get(key2, false);
- assertNotNull("Local resource doesn't exist but should", key2rsrc);
- assertEquals("key doesn't match", key2, key2rsrc.getKey());
- assertEquals("refcount doesn't match", 0, key2rsrc.getRefCount());
- LocalizedResource archive1rsrc = lrsrcSet.get(archive1, true);
- assertNotNull("Local resource doesn't exist but should", archive1rsrc);
- assertEquals("key doesn't match", archive1, archive1rsrc.getKey());
- assertEquals("refcount doesn't match", 1, archive1rsrc.getRefCount());
-
- LocalizedResourceSet lrsrcSet2 = localizer.getUserResources().get(user2);
- assertEquals("local resource set size wrong", 3, lrsrcSet2.getSize());
- assertEquals("user doesn't match", user2, lrsrcSet2.getUser());
- LocalizedResource key3rsrc = lrsrcSet2.get(key3, false);
- assertNotNull("Local resource doesn't exist but should", key3rsrc);
- assertEquals("key doesn't match", key3, key3rsrc.getKey());
- assertEquals("refcount doesn't match", 0, key3rsrc.getRefCount());
- LocalizedResource key4rsrc = lrsrcSet2.get(key4, false);
- assertNotNull("Local resource doesn't exist but should", key4rsrc);
- assertEquals("key doesn't match", key4, key4rsrc.getKey());
- assertEquals("refcount doesn't match", 0, key4rsrc.getRefCount());
- LocalizedResource archive2rsrc = lrsrcSet2.get(archive2, true);
- assertNotNull("Local resource doesn't exist but should", archive2rsrc);
- assertEquals("key doesn't match", archive2, archive2rsrc.getKey());
- assertEquals("refcount doesn't match", 0, archive2rsrc.getRefCount());
- }
-
- @Test
- public void testArchivesTgz() throws Exception {
- testArchives(getFileFromResource(joinPath("localizer", "localtestwithsymlink.tgz")), true, 21344);
- }
-
- @Test
- public void testArchivesZip() throws Exception {
- testArchives(getFileFromResource(joinPath("localizer", "localtest.zip")), false, 21348);
- }
-
- @Test
- public void testArchivesTarGz() throws Exception {
- testArchives(getFileFromResource(joinPath("localizer", "localtestwithsymlink.tar.gz")), true, 21344);
- }
-
- @Test
- public void testArchivesTar() throws Exception {
- testArchives(getFileFromResource(joinPath("localizer", "localtestwithsymlink.tar")), true, 21344);
- }
-
- @Test
- public void testArchivesJar() throws Exception {
- testArchives(getFileFromResource(joinPath("localizer", "localtestwithsymlink.jar")), false, 21416);
- }
-
- private File getFileFromResource(String archivePath) {
- ClassLoader classLoader = getClass().getClassLoader();
- return new File(classLoader.getResource(archivePath).getFile());
- }
-
- // archive passed in must contain symlink named tmptestsymlink if not a zip file
- public void testArchives(File archiveFile, boolean supportSymlinks, int size) throws Exception {
- if (Utils.isOnWindows()) {
- // Windows should set this to false cause symlink in compressed file doesn't work properly.
- supportSymlinks = false;
- }
-
- Map<String, Object> conf = new HashMap();
- // set clean time really high so doesn't kick in
- conf.put(DaemonConfig.SUPERVISOR_LOCALIZER_CACHE_CLEANUP_INTERVAL_MS, 60*60*1000);
-
- String key1 = archiveFile.getName();
- String topo1 = "topo1";
- Localizer localizer = new TestLocalizer(conf, baseDir.toString());
- // set really small so will do cleanup
- localizer.setTargetCacheSize(1);
-
- ReadableBlobMeta rbm = new ReadableBlobMeta();
- rbm.set_settable(new SettableBlobMeta(BlobStoreAclHandler.WORLD_EVERYTHING));
- when(mockblobstore.getBlobMeta(key1)).thenReturn(rbm);
-
- when(mockblobstore.getBlob(key1)).thenReturn(new TestInputStreamWithMeta(new
- FileInputStream(archiveFile.getAbsolutePath())));
-
- long timeBefore = System.nanoTime();
- File user1Dir = localizer.getLocalUserFileCacheDir(user1);
- assertTrue("failed to create user dir", user1Dir.mkdirs());
- LocalizedResource lrsrc = localizer.getBlob(new LocalResource(key1, true), user1, topo1,
- user1Dir);
- long timeAfter = System.nanoTime();
-
- String expectedUserDir = joinPath(baseDir.toString(), USERCACHE, user1);
- String expectedFileDir = joinPath(expectedUserDir, Localizer.FILECACHE, Localizer.ARCHIVESDIR);
- assertTrue("user filecache dir not created", new File(expectedFileDir).exists());
- File keyFile = new File(expectedFileDir, key1 + ".0");
- assertTrue("blob not created", keyFile.exists());
- assertTrue("blob is not uncompressed", keyFile.isDirectory());
- File symlinkFile = new File(keyFile, "tmptestsymlink");
-
- if (supportSymlinks) {
- assertTrue("blob uncompressed doesn't contain symlink", Files.isSymbolicLink(
- symlinkFile.toPath()));
- } else {
- assertTrue("blob symlink file doesn't exist", symlinkFile.exists());
- }
-
- LocalizedResourceSet lrsrcSet = localizer.getUserResources().get(user1);
- assertEquals("local resource set size wrong", 1, lrsrcSet.getSize());
- assertEquals("user doesn't match", user1, lrsrcSet.getUser());
- LocalizedResource key1rsrc = lrsrcSet.get(key1, true);
- assertNotNull("Local resource doesn't exist but should", key1rsrc);
- assertEquals("key doesn't match", key1, key1rsrc.getKey());
- assertEquals("refcount doesn't match", 1, key1rsrc.getRefCount());
- assertEquals("file path doesn't match", keyFile.toString(), key1rsrc.getFilePathWithVersion());
- assertEquals("size doesn't match", size, key1rsrc.getSize());
- assertTrue("timestamp not within range", (key1rsrc.getLastAccessTime() >= timeBefore && key1rsrc
- .getLastAccessTime() <= timeAfter));
-
- timeBefore = System.nanoTime();
- localizer.removeBlobReference(lrsrc.getKey(), user1, topo1, true);
- timeAfter = System.nanoTime();
-
- lrsrcSet = localizer.getUserResources().get(user1);
- assertEquals("local resource set size wrong", 1, lrsrcSet.getSize());
- key1rsrc = lrsrcSet.get(key1, true);
- assertNotNull("Local resource doesn't exist but should", key1rsrc);
- assertEquals("refcount doesn't match", 0, key1rsrc.getRefCount());
- assertTrue("timestamp not within range", (key1rsrc.getLastAccessTime() >= timeBefore && key1rsrc
- .getLastAccessTime() <= timeAfter));
-
- // should remove the blob since cache size set really small
- localizer.handleCacheCleanup();
-
- lrsrcSet = localizer.getUserResources().get(user1);
- assertFalse("blob contents not deleted", symlinkFile.exists());
- assertFalse("blob not deleted", keyFile.exists());
- assertFalse("blob file dir not deleted", new File(expectedFileDir).exists());
- assertFalse("blob dir not deleted", new File(expectedUserDir).exists());
- assertNull("user set should be null", lrsrcSet);
-
- }
-
- @Test
- public void testBasic() throws Exception {
- Map<String, Object> conf = new HashMap();
- // set clean time really high so doesn't kick in
- conf.put(DaemonConfig.SUPERVISOR_LOCALIZER_CACHE_CLEANUP_INTERVAL_MS, 60*60*1000);
-
- String key1 = "key1";
- String topo1 = "topo1";
- Localizer localizer = new TestLocalizer(conf, baseDir.toString());
- // set really small so will do cleanup
- localizer.setTargetCacheSize(1);
-
- ReadableBlobMeta rbm = new ReadableBlobMeta();
- rbm.set_settable(new SettableBlobMeta(BlobStoreAclHandler.WORLD_EVERYTHING));
- when(mockblobstore.getBlobMeta(key1)).thenReturn(rbm);
-
- when(mockblobstore.getBlob(key1)).thenReturn(new TestInputStreamWithMeta());
-
- long timeBefore = System.nanoTime();
- File user1Dir = localizer.getLocalUserFileCacheDir(user1);
- assertTrue("failed to create user dir", user1Dir.mkdirs());
- LocalizedResource lrsrc = localizer.getBlob(new LocalResource(key1, false), user1, topo1,
- user1Dir);
- long timeAfter = System.nanoTime();
-
- String expectedUserDir = joinPath(baseDir.toString(), USERCACHE, user1);
- String expectedFileDir = joinPath(expectedUserDir, Localizer.FILECACHE, Localizer.FILESDIR);
- assertTrue("user filecache dir not created", new File(expectedFileDir).exists());
- File keyFile = new File(expectedFileDir, key1);
- File keyFileCurrentSymlink = new File(expectedFileDir, key1 + ServerUtils.DEFAULT_CURRENT_BLOB_SUFFIX);
-
- assertTrue("blob not created", keyFileCurrentSymlink.exists());
-
- LocalizedResourceSet lrsrcSet = localizer.getUserResources().get(user1);
- assertEquals("local resource set size wrong", 1, lrsrcSet.getSize());
- assertEquals("user doesn't match", user1, lrsrcSet.getUser());
- LocalizedResource key1rsrc = lrsrcSet.get(key1, false);
- assertNotNull("Local resource doesn't exist but should", key1rsrc);
- assertEquals("key doesn't match", key1, key1rsrc.getKey());
- assertEquals("refcount doesn't match", 1, key1rsrc.getRefCount());
- assertEquals("file path doesn't match", keyFile.toString(), key1rsrc.getFilePath());
- assertEquals("size doesn't match", 34, key1rsrc.getSize());
- assertTrue("timestamp not within range", (key1rsrc.getLastAccessTime() >= timeBefore && key1rsrc
- .getLastAccessTime() <= timeAfter));
-
- timeBefore = System.nanoTime();
- localizer.removeBlobReference(lrsrc.getKey(), user1, topo1, false);
- timeAfter = System.nanoTime();
-
- lrsrcSet = localizer.getUserResources().get(user1);
- assertEquals("local resource set size wrong", 1, lrsrcSet.getSize());
- key1rsrc = lrsrcSet.get(key1, false);
- assertNotNull("Local resource doesn't exist but should", key1rsrc);
- assertEquals("refcount doesn't match", 0, key1rsrc.getRefCount());
- assertTrue("timestamp not within range", (key1rsrc.getLastAccessTime() >= timeBefore && key1rsrc
- .getLastAccessTime() <= timeAfter));
-
- // should remove the blob since cache size set really small
- localizer.handleCacheCleanup();
-
- lrsrcSet = localizer.getUserResources().get(user1);
- assertNull("user set should be null", lrsrcSet);
- assertFalse("blob not deleted", keyFile.exists());
- assertFalse("blob dir not deleted", new File(expectedFileDir).exists());
- assertFalse("blob dir not deleted", new File(expectedUserDir).exists());
- }
-
- @Test
- public void testMultipleKeysOneUser() throws Exception {
- Map<String, Object> conf = new HashMap();
- // set clean time really high so doesn't kick in
- conf.put(DaemonConfig.SUPERVISOR_LOCALIZER_CACHE_CLEANUP_INTERVAL_MS, 60*60*1000);
-
- String key1 = "key1";
- String topo1 = "topo1";
- String key2 = "key2";
- String key3 = "key3";
- Localizer localizer = new TestLocalizer(conf, baseDir.toString());
- // set to keep 2 blobs (each of size 34)
- localizer.setTargetCacheSize(68);
-
- ReadableBlobMeta rbm = new ReadableBlobMeta();
- rbm.set_settable(new SettableBlobMeta(BlobStoreAclHandler.WORLD_EVERYTHING));
- when(mockblobstore.getBlobMeta(anyString())).thenReturn(rbm);
- when(mockblobstore.getBlob(key1)).thenReturn(new TestInputStreamWithMeta());
- when(mockblobstore.getBlob(key2)).thenReturn(new TestInputStreamWithMeta());
- when(mockblobstore.getBlob(key3)).thenReturn(new TestInputStreamWithMeta());
-
- List<LocalResource> keys = Arrays.asList(new LocalResource[]{new LocalResource(key1, false),
- new LocalResource(key2, false), new LocalResource(key3, false)});
- File user1Dir = localizer.getLocalUserFileCacheDir(user1);
- assertTrue("failed to create user dir", user1Dir.mkdirs());
-
- List<LocalizedResource> lrsrcs = localizer.getBlobs(keys, user1, topo1, user1Dir);
- LocalizedResource lrsrc = lrsrcs.get(0);
- LocalizedResource lrsrc2 = lrsrcs.get(1);
- LocalizedResource lrsrc3 = lrsrcs.get(2);
-
- String expectedFileDir = joinPath(baseDir.toString(), USERCACHE, user1,
- Localizer.FILECACHE, Localizer.FILESDIR);
- assertTrue("user filecache dir not created", new File(expectedFileDir).exists());
- File keyFile = new File(expectedFileDir, key1 + ServerUtils.DEFAULT_CURRENT_BLOB_SUFFIX);
- File keyFile2 = new File(expectedFileDir, key2 + ServerUtils.DEFAULT_CURRENT_BLOB_SUFFIX);
- File keyFile3 = new File(expectedFileDir, key3 + ServerUtils.DEFAULT_CURRENT_BLOB_SUFFIX);
-
- assertTrue("blob not created", keyFile.exists());
- assertTrue("blob not created", keyFile2.exists());
- assertTrue("blob not created", keyFile3.exists());
- assertEquals("size doesn't match", 34, keyFile.length());
- assertEquals("size doesn't match", 34, keyFile2.length());
- assertEquals("size doesn't match", 34, keyFile3.length());
- assertEquals("size doesn't match", 34, lrsrc.getSize());
- assertEquals("size doesn't match", 34, lrsrc3.getSize());
- assertEquals("size doesn't match", 34, lrsrc2.getSize());
-
- LocalizedResourceSet lrsrcSet = localizer.getUserResources().get(user1);
- assertEquals("local resource set size wrong", 3, lrsrcSet.getSize());
- assertEquals("user doesn't match", user1, lrsrcSet.getUser());
-
- long timeBefore = System.nanoTime();
- localizer.removeBlobReference(lrsrc.getKey(), user1, topo1, false);
- localizer.removeBlobReference(lrsrc2.getKey(), user1, topo1, false);
- localizer.removeBlobReference(lrsrc3.getKey(), user1, topo1, false);
- long timeAfter = System.nanoTime();
-
- // add reference to one and then remove reference again so it has newer timestamp
- lrsrc = localizer.getBlob(new LocalResource(key1, false), user1, topo1, user1Dir);
- assertTrue("timestamp not within range", (lrsrc.getLastAccessTime() >= timeBefore && lrsrc
- .getLastAccessTime() <= timeAfter));
- localizer.removeBlobReference(lrsrc.getKey(), user1, topo1, false);
-
- // should remove the second blob first
- localizer.handleCacheCleanup();
-
- lrsrcSet = localizer.getUserResources().get(user1);
- assertEquals("local resource set size wrong", 2, lrsrcSet.getSize());
- long end = System.currentTimeMillis() + 100;
- while ((end - System.currentTimeMillis()) >= 0 && keyFile2.exists()) {
- Thread.sleep(1);
- }
- assertFalse("blob not deleted", keyFile2.exists());
- assertTrue("blob deleted", keyFile.exists());
- assertTrue("blob deleted", keyFile3.exists());
-
- // set size to cleanup another one
- localizer.setTargetCacheSize(34);
-
- // should remove the third blob
- localizer.handleCacheCleanup();
-
- lrsrcSet = localizer.getUserResources().get(user1);
- assertEquals("local resource set size wrong", 1, lrsrcSet.getSize());
- assertTrue("blob deleted", keyFile.exists());
- assertFalse("blob not deleted", keyFile3.exists());
- }
-
- @Test(expected = AuthorizationException.class)
- public void testFailAcls() throws Exception {
- Map<String, Object> conf = new HashMap();
- // set clean time really high so doesn't kick in
- conf.put(DaemonConfig.SUPERVISOR_LOCALIZER_CACHE_CLEANUP_INTERVAL_MS, 60 * 60 * 1000);
- // enable blobstore acl validation
- conf.put(Config.STORM_BLOBSTORE_ACL_VALIDATION_ENABLED, true);
-
- String topo1 = "topo1";
- String key1 = "key1";
- Localizer localizer = new TestLocalizer(conf, baseDir.toString());
-
- ReadableBlobMeta rbm = new ReadableBlobMeta();
- // set acl so user doesn't have read access
- AccessControl acl = new AccessControl(AccessControlType.USER, BlobStoreAclHandler.ADMIN);
- acl.set_name(user1);
- rbm.set_settable(new SettableBlobMeta(Arrays.asList(acl)));
- when(mockblobstore.getBlobMeta(anyString())).thenReturn(rbm);
- when(mockblobstore.getBlob(key1)).thenReturn(new TestInputStreamWithMeta());
- File user1Dir = localizer.getLocalUserFileCacheDir(user1);
- assertTrue("failed to create user dir", user1Dir.mkdirs());
-
- // This should throw AuthorizationException because auth failed
- localizer.getBlob(new LocalResource(key1, false), user1, topo1, user1Dir);
- }
-
- @Test(expected = KeyNotFoundException.class)
- public void testKeyNotFoundException() throws Exception {
- Map<String, Object> conf = Utils.readStormConfig();
- String key1 = "key1";
- conf.put(Config.STORM_LOCAL_DIR, "target");
- LocalFsBlobStore bs = new LocalFsBlobStore();
- LocalFsBlobStore spy = spy(bs);
- Mockito.doReturn(true).when(spy).checkForBlobOrDownload(key1);
- Mockito.doNothing().when(spy).checkForBlobUpdate(key1);
- spy.prepare(conf,null,null);
- spy.getBlob(key1, null);
- }
-
- @Test
- public void testMultipleUsers() throws Exception {
- Map<String, Object> conf = new HashMap();
- // set clean time really high so doesn't kick in
- conf.put(DaemonConfig.SUPERVISOR_LOCALIZER_CACHE_CLEANUP_INTERVAL_MS, 60*60*1000);
-
- String topo1 = "topo1";
- String topo2 = "topo2";
- String topo3 = "topo3";
- String key1 = "key1";
- String key2 = "key2";
- String key3 = "key3";
- Localizer localizer = new TestLocalizer(conf, baseDir.toString());
- // set to keep 2 blobs (each of size 34)
- localizer.setTargetCacheSize(68);
-
- ReadableBlobMeta rbm = new ReadableBlobMeta();
- rbm.set_settable(new SettableBlobMeta(BlobStoreAclHandler.WORLD_EVERYTHING));
- when(mockblobstore.getBlobMeta(anyString())).thenReturn(rbm);
- when(mockblobstore.getBlob(key1)).thenReturn(new TestInputStreamWithMeta());
- when(mockblobstore.getBlob(key2)).thenReturn(new TestInputStreamWithMeta());
- when(mockblobstore.getBlob(key3)).thenReturn(new TestInputStreamWithMeta());
-
- File user1Dir = localizer.getLocalUserFileCacheDir(user1);
- assertTrue("failed to create user dir", user1Dir.mkdirs());
- File user2Dir = localizer.getLocalUserFileCacheDir(user2);
- assertTrue("failed to create user dir", user2Dir.mkdirs());
- File user3Dir = localizer.getLocalUserFileCacheDir(user3);
- assertTrue("failed to create user dir", user3Dir.mkdirs());
-
- LocalizedResource lrsrc = localizer.getBlob(new LocalResource(key1, false), user1, topo1,
- user1Dir);
- LocalizedResource lrsrc2 = localizer.getBlob(new LocalResource(key2, false), user2, topo2,
- user2Dir);
- LocalizedResource lrsrc3 = localizer.getBlob(new LocalResource(key3, false), user3, topo3,
- user3Dir);
- // make sure we support different user reading same blob
- LocalizedResource lrsrc1_user3 = localizer.getBlob(new LocalResource(key1, false), user3,
- topo3, user3Dir);
-
- String expectedUserDir1 = joinPath(baseDir.toString(), USERCACHE, user1);
- String expectedFileDirUser1 = joinPath(expectedUserDir1, Localizer.FILECACHE, Localizer.FILESDIR);
- String expectedFileDirUser2 = joinPath(baseDir.toString(), USERCACHE, user2,
- Localizer.FILECACHE, Localizer.FILESDIR);
- String expectedFileDirUser3 = joinPath(baseDir.toString(), USERCACHE, user3,
- Localizer.FILECACHE, Localizer.FILESDIR);
- assertTrue("user filecache dir user1 not created", new File(expectedFileDirUser1).exists());
- assertTrue("user filecache dir user2 not created", new File(expectedFileDirUser2).exists());
- assertTrue("user filecache dir user3 not created", new File(expectedFileDirUser3).exists());
-
- File keyFile = new File(expectedFileDirUser1, key1 + ServerUtils.DEFAULT_CURRENT_BLOB_SUFFIX);
- File keyFile2 = new File(expectedFileDirUser2, key2 + ServerUtils.DEFAULT_CURRENT_BLOB_SUFFIX);
- File keyFile3 = new File(expectedFileDirUser3, key3 + ServerUtils.DEFAULT_CURRENT_BLOB_SUFFIX);
- File keyFile1user3 = new File(expectedFileDirUser3, key1 + ServerUtils.DEFAULT_CURRENT_BLOB_SUFFIX);
-
- assertTrue("blob not created", keyFile.exists());
- assertTrue("blob not created", keyFile2.exists());
- assertTrue("blob not created", keyFile3.exists());
- assertTrue("blob not created", keyFile1user3.exists());
-
- LocalizedResourceSet lrsrcSet = localizer.getUserResources().get(user1);
- assertEquals("local resource set size wrong", 1, lrsrcSet.getSize());
- LocalizedResourceSet lrsrcSet2 = localizer.getUserResources().get(user2);
- assertEquals("local resource set size wrong", 1, lrsrcSet2.getSize());
- LocalizedResourceSet lrsrcSet3 = localizer.getUserResources().get(user3);
- assertEquals("local resource set size wrong", 2, lrsrcSet3.getSize());
-
- localizer.removeBlobReference(lrsrc.getKey(), user1, topo1, false);
- // should remove key1
- localizer.handleCacheCleanup();
-
- lrsrcSet = localizer.getUserResources().get(user1);
- lrsrcSet3 = localizer.getUserResources().get(user3);
- assertNull("user set should be null", lrsrcSet);
- assertFalse("blob dir not deleted", new File(expectedFileDirUser1).exists());
- assertFalse("blob dir not deleted", new File(expectedUserDir1).exists());
- assertEquals("local resource set size wrong", 2, lrsrcSet3.getSize());
-
- assertTrue("blob deleted", keyFile2.exists());
- assertFalse("blob not deleted", keyFile.exists());
- assertTrue("blob deleted", keyFile3.exists());
- assertTrue("blob deleted", keyFile1user3.exists());
- }
-
- @Test
- public void testUpdate() throws Exception {
- Map<String, Object> conf = new HashMap();
- // set clean time really high so doesn't kick in
- conf.put(DaemonConfig.SUPERVISOR_LOCALIZER_CACHE_CLEANUP_INTERVAL_MS, 60*60*1000);
-
- String key1 = "key1";
- String topo1 = "topo1";
- String topo2 = "topo2";
- Localizer localizer = new TestLocalizer(conf, baseDir.toString());
-
- ReadableBlobMeta rbm = new ReadableBlobMeta();
- rbm.set_version(1);
- rbm.set_settable(new SettableBlobMeta(BlobStoreAclHandler.WORLD_EVERYTHING));
- when(mockblobstore.getBlobMeta(key1)).thenReturn(rbm);
- when(mockblobstore.getBlob(key1)).thenReturn(new TestInputStreamWithMeta());
-
- File user1Dir = localizer.getLocalUserFileCacheDir(user1);
- assertTrue("failed to create user dir", user1Dir.mkdirs());
- LocalizedResource lrsrc = localizer.getBlob(new LocalResource(key1, false), user1, topo1,
- user1Dir);
-
- String expectedUserDir = joinPath(baseDir.toString(), USERCACHE, user1);
- String expectedFileDir = joinPath(expectedUserDir, Localizer.FILECACHE, Localizer.FILESDIR);
- assertTrue("user filecache dir not created", new File(expectedFileDir).exists());
- File keyFile = new File(expectedFileDir, key1);
- File keyFileCurrentSymlink = new File(expectedFileDir, key1 + ServerUtils.DEFAULT_CURRENT_BLOB_SUFFIX);
- assertTrue("blob not created", keyFileCurrentSymlink.exists());
- File versionFile = new File(expectedFileDir, key1 + ServerUtils.DEFAULT_BLOB_VERSION_SUFFIX);
- assertTrue("blob version file not created", versionFile.exists());
- assertEquals("blob version not correct", 1, ServerUtils.localVersionOfBlob(keyFile.toString()));
-
- LocalizedResourceSet lrsrcSet = localizer.getUserResources().get(user1);
- assertEquals("local resource set size wrong", 1, lrsrcSet.getSize());
-
- // test another topology getting blob with updated version - it should update version now
- rbm.set_version(2);
-
- localizer.getBlob(new LocalResource(key1, false), user1, topo2, user1Dir);
- assertTrue("blob version file not created", versionFile.exists());
- assertEquals("blob version not correct", 2, ServerUtils.localVersionOfBlob(keyFile.toString()));
- assertTrue("blob file with version 2 not created", new File(keyFile + ".2").exists());
-
- // now test regular updateBlob
- rbm.set_version(3);
-
- ArrayList<LocalResource> arr = new ArrayList<LocalResource>();
- arr.add(new LocalResource(key1, false));
- localizer.updateBlobs(arr, user1);
- assertTrue("blob version file not created", versionFile.exists());
- assertEquals("blob version not correct", 3, ServerUtils.localVersionOfBlob(keyFile.toString()));
- assertTrue("blob file with version 3 not created", new File(keyFile + ".3").exists());
- }
-}
[6/9] storm git commit: [MINOR] fix typos in LogviewerServer
Posted by ka...@apache.org.
[MINOR] fix typos in LogviewerServer
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/b1f98c54
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/b1f98c54
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/b1f98c54
Branch: refs/heads/master
Commit: b1f98c5442cb402cc701514fa80c213135117273
Parents: db510ae
Author: Meng Li (Ethan) <et...@gmail.com>
Authored: Tue Sep 26 14:30:55 2017 -0500
Committer: GitHub <no...@github.com>
Committed: Tue Sep 26 14:30:55 2017 -0500
----------------------------------------------------------------------
.../java/org/apache/storm/daemon/logviewer/LogviewerServer.java | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/b1f98c54/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/LogviewerServer.java
----------------------------------------------------------------------
diff --git a/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/LogviewerServer.java b/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/LogviewerServer.java
index 0802015..87e7925 100644
--- a/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/LogviewerServer.java
+++ b/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/LogviewerServer.java
@@ -54,7 +54,7 @@ import org.slf4j.LoggerFactory;
*/
public class LogviewerServer implements AutoCloseable {
private static final Logger LOG = LoggerFactory.getLogger(LogviewerServer.class);
- private static final Meter meterShutdownCalls = StormMetricsRegistry.registerMeter("drpc:num-shutdown-calls");
+ private static final Meter meterShutdownCalls = StormMetricsRegistry.registerMeter("logviewer:num-shutdown-calls");
public static final String STATIC_RESOURCE_DIRECTORY_PATH = "./public";
private static Server mkHttpServer(Map<String, Object> conf) {
[8/9] storm git commit: Merge branch 'STORM-2758' of
https://github.com/Ethanlm/storm into STORM-2758-merge
Posted by ka...@apache.org.
Merge branch 'STORM-2758' of https://github.com/Ethanlm/storm into STORM-2758-merge
* fixed merge conflict by Jungtaek Lim <ka...@gmail.com>
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/99969a5b
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/99969a5b
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/99969a5b
Branch: refs/heads/master
Commit: 99969a5bf259c3fd0dc3ee12cc5380ed87896d18
Parents: ddea2ac ec77f66
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Thu Sep 28 16:19:48 2017 +0900
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Thu Sep 28 16:19:48 2017 +0900
----------------------------------------------------------------------
.../java/org/apache/storm/daemon/logviewer/LogviewerServer.java | 3 ++-
1 file changed, 2 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/99969a5b/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/LogviewerServer.java
----------------------------------------------------------------------
diff --cc storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/LogviewerServer.java
index 87e7925,5bceb21..77f939f
--- a/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/LogviewerServer.java
+++ b/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/LogviewerServer.java
@@@ -54,8 -54,9 +54,9 @@@ import org.slf4j.LoggerFactory
*/
public class LogviewerServer implements AutoCloseable {
private static final Logger LOG = LoggerFactory.getLogger(LogviewerServer.class);
- private static final Meter meterShutdownCalls = StormMetricsRegistry.registerMeter("drpc:num-shutdown-calls");
+ private static final Meter meterShutdownCalls = StormMetricsRegistry.registerMeter("logviewer:num-shutdown-calls");
- public static final String STATIC_RESOURCE_DIRECTORY_PATH = "./public";
+ private static final String stormHome = System.getProperty("storm.home");
+ public static final String STATIC_RESOURCE_DIRECTORY_PATH = stormHome + "/public";
private static Server mkHttpServer(Map<String, Object> conf) {
Integer logviewerHttpPort = (Integer) conf.get(DaemonConfig.LOGVIEWER_PORT);
[9/9] storm git commit: Merge branch 'STORM-2084' of
https://github.com/revans2/incubator-storm into STORM-2084-merge
Posted by ka...@apache.org.
Merge branch 'STORM-2084' of https://github.com/revans2/incubator-storm into STORM-2084-merge
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/5c05bb74
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/5c05bb74
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/5c05bb74
Branch: refs/heads/master
Commit: 5c05bb74cbef88dfd63ab212183c0494b9fcc985
Parents: 99969a5 78cb243
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Thu Sep 28 16:20:32 2017 +0900
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Thu Sep 28 16:20:32 2017 +0900
----------------------------------------------------------------------
conf/defaults.yaml | 2 +-
.../src/jvm/org/apache/storm/StormTimer.java | 2 +-
.../org/apache/storm/testing4j_test.clj | 44 +-
.../daemon/supervisor/ReadClusterState.java | 4 +-
.../apache/storm/daemon/supervisor/Slot.java | 24 +-
.../storm/daemon/supervisor/Supervisor.java | 53 +-
.../daemon/supervisor/SupervisorUtils.java | 33 -
.../daemon/supervisor/timer/UpdateBlobs.java | 111 --
.../org/apache/storm/event/EventManagerImp.java | 2 +-
.../apache/storm/localizer/AsyncLocalizer.java | 1030 +++++++++++++++---
.../org/apache/storm/localizer/ILocalizer.java | 70 --
.../localizer/LocalDownloadedResource.java | 42 +-
.../org/apache/storm/localizer/Localizer.java | 695 ------------
.../org/apache/storm/utils/ServerUtils.java | 18 +-
.../storm/daemon/supervisor/SlotTest.java | 24 +-
.../storm/localizer/AsyncLocalizerTest.java | 699 +++++++++++-
.../apache/storm/localizer/LocalizerTest.java | 682 ------------
17 files changed, 1627 insertions(+), 1908 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/5c05bb74/conf/defaults.yaml
----------------------------------------------------------------------
[4/9] storm git commit: STORM-2084: Refactor localization to combine
files together
Posted by ka...@apache.org.
STORM-2084: Refactor localization to combine files together
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/78cb243c
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/78cb243c
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/78cb243c
Branch: refs/heads/master
Commit: 78cb243c4bc9aaeaffd6f1c76915ac20016b32e7
Parents: 66ff5fd
Author: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Authored: Fri Sep 15 13:29:40 2017 -0500
Committer: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Committed: Thu Sep 21 15:59:24 2017 -0500
----------------------------------------------------------------------
conf/defaults.yaml | 2 +-
.../src/jvm/org/apache/storm/StormTimer.java | 2 +-
.../org/apache/storm/testing4j_test.clj | 44 +-
.../daemon/supervisor/ReadClusterState.java | 4 +-
.../apache/storm/daemon/supervisor/Slot.java | 24 +-
.../storm/daemon/supervisor/Supervisor.java | 53 +-
.../daemon/supervisor/SupervisorUtils.java | 33 -
.../daemon/supervisor/timer/UpdateBlobs.java | 111 --
.../org/apache/storm/event/EventManagerImp.java | 2 +-
.../apache/storm/localizer/AsyncLocalizer.java | 1030 +++++++++++++++---
.../org/apache/storm/localizer/ILocalizer.java | 70 --
.../localizer/LocalDownloadedResource.java | 42 +-
.../org/apache/storm/localizer/Localizer.java | 695 ------------
.../org/apache/storm/utils/ServerUtils.java | 18 +-
.../storm/daemon/supervisor/SlotTest.java | 24 +-
.../storm/localizer/AsyncLocalizerTest.java | 699 +++++++++++-
.../apache/storm/localizer/LocalizerTest.java | 682 ------------
17 files changed, 1627 insertions(+), 1908 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/78cb243c/conf/defaults.yaml
----------------------------------------------------------------------
diff --git a/conf/defaults.yaml b/conf/defaults.yaml
index c6ef390..679a74b 100644
--- a/conf/defaults.yaml
+++ b/conf/defaults.yaml
@@ -123,7 +123,7 @@ supervisor.blobstore.class: "org.apache.storm.blobstore.NimbusBlobStore"
supervisor.blobstore.download.thread.count: 5
supervisor.blobstore.download.max_retries: 3
supervisor.localizer.cache.target.size.mb: 10240
-supervisor.localizer.cleanup.interval.ms: 600000
+supervisor.localizer.cleanup.interval.ms: 30000
nimbus.blobstore.class: "org.apache.storm.blobstore.LocalFsBlobStore"
nimbus.blobstore.expiration.secs: 600
http://git-wip-us.apache.org/repos/asf/storm/blob/78cb243c/storm-client/src/jvm/org/apache/storm/StormTimer.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/StormTimer.java b/storm-client/src/jvm/org/apache/storm/StormTimer.java
index 4f6a7d5..b2e2b4a 100644
--- a/storm-client/src/jvm/org/apache/storm/StormTimer.java
+++ b/storm-client/src/jvm/org/apache/storm/StormTimer.java
@@ -225,7 +225,7 @@ public class StormTimer implements AutoCloseable {
*/
@Override
- public void close() throws Exception {
+ public void close() throws InterruptedException {
if (this.task.isActive()) {
this.task.setActive(false);
this.task.interrupt();
http://git-wip-us.apache.org/repos/asf/storm/blob/78cb243c/storm-core/test/clj/integration/org/apache/storm/testing4j_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/integration/org/apache/storm/testing4j_test.clj b/storm-core/test/clj/integration/org/apache/storm/testing4j_test.clj
index 87e1fc0..1b12928 100644
--- a/storm-core/test/clj/integration/org/apache/storm/testing4j_test.clj
+++ b/storm-core/test/clj/integration/org/apache/storm/testing4j_test.clj
@@ -65,13 +65,13 @@
(reify TestJob
(^void run [this ^ILocalCluster cluster]
(let [topology (Thrift/buildTopology
- {"1" (Thrift/prepareSpoutDetails (TestWordSpout. true) (Integer. 3))}
+ {"spout" (Thrift/prepareSpoutDetails (TestWordSpout. true) (Integer. 3))}
{"2" (Thrift/prepareBoltDetails
- {(GlobalStreamId. "1" Utils/DEFAULT_STREAM_ID)
+ {(GlobalStreamId. "spout" Utils/DEFAULT_STREAM_ID)
(Thrift/prepareFieldsGrouping ["word"])}
(TestWordCounter.) (Integer. 4))
"3" (Thrift/prepareBoltDetails
- {(GlobalStreamId. "1" Utils/DEFAULT_STREAM_ID)
+ {(GlobalStreamId. "spout" Utils/DEFAULT_STREAM_ID)
(Thrift/prepareGlobalGrouping)}
(TestGlobalCount.))
"4" (Thrift/prepareBoltDetails
@@ -79,7 +79,7 @@
(Thrift/prepareGlobalGrouping)}
(TestAggregatesCounter.))})
mocked-sources (doto (MockedSources.)
- (.addMockData "1" (into-array Values [(Values. (into-array ["nathan"]))
+ (.addMockData "spout" (into-array Values [(Values. (into-array ["nathan"]))
(Values. (into-array ["bob"]))
(Values. (into-array ["joey"]))
(Values. (into-array ["nathan"]))])
@@ -93,7 +93,7 @@
topology
complete-topology-param)]
(is (Testing/multiseteq [["nathan"] ["bob"] ["joey"] ["nathan"]]
- (Testing/readTuples results "1")))
+ (Testing/readTuples results "spout")))
(is (Testing/multiseteq [["nathan" (int 1)] ["nathan" (int 2)] ["bob" (int 1)] ["joey" (int 1)]]
(Testing/readTuples results "2")))
(is (= [[1] [2] [3] [4]]
@@ -102,18 +102,36 @@
(Testing/readTuples results "4")))
))))
-(deftest test-complete-topology
- (doseq [zmq-on? [true false]
- :let [daemon-conf (doto (Config.)
- (.put STORM-LOCAL-MODE-ZMQ zmq-on?))
- mk-cluster-param (doto (MkClusterParam.)
- (.setSupervisors (int 4))
- (.setDaemonConf daemon-conf))]]
+(deftest test-complete-topology-netty-simulated
+ (let [daemon-conf (doto (Config.)
+ (.put STORM-LOCAL-MODE-ZMQ true))
+ mk-cluster-param (doto (MkClusterParam.)
+ (.setSupervisors (int 4))
+ (.setDaemonConf daemon-conf))]
(Testing/withSimulatedTimeLocalCluster
- mk-cluster-param complete-topology-testjob )
+ mk-cluster-param complete-topology-testjob)))
+
+(deftest test-complete-topology-netty
+ (let [daemon-conf (doto (Config.)
+ (.put STORM-LOCAL-MODE-ZMQ true))
+ mk-cluster-param (doto (MkClusterParam.)
+ (.setSupervisors (int 4))
+ (.setDaemonConf daemon-conf))]
(Testing/withLocalCluster
mk-cluster-param complete-topology-testjob)))
+(deftest test-complete-topology-local
+ (let [mk-cluster-param (doto (MkClusterParam.)
+ (.setSupervisors (int 4)))]
+ (Testing/withLocalCluster
+ mk-cluster-param complete-topology-testjob)))
+
+(deftest test-complete-topology-local-simulated
+ (let [mk-cluster-param (doto (MkClusterParam.)
+ (.setSupervisors (int 4)))]
+ (Testing/withSimulatedTimeLocalCluster
+ mk-cluster-param complete-topology-testjob)))
+
(deftest test-with-tracked-cluster
(Testing/withTrackedCluster
(reify TestJob
http://git-wip-us.apache.org/repos/asf/storm/blob/78cb243c/storm-server/src/main/java/org/apache/storm/daemon/supervisor/ReadClusterState.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/ReadClusterState.java b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/ReadClusterState.java
index e346a09..d68e512 100644
--- a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/ReadClusterState.java
+++ b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/ReadClusterState.java
@@ -41,7 +41,7 @@ import org.apache.storm.generated.LocalAssignment;
import org.apache.storm.generated.NodeInfo;
import org.apache.storm.generated.ProfileRequest;
import org.apache.storm.generated.WorkerResources;
-import org.apache.storm.localizer.ILocalizer;
+import org.apache.storm.localizer.AsyncLocalizer;
import org.apache.storm.scheduler.ISupervisor;
import org.apache.storm.utils.LocalState;
import org.apache.storm.utils.Time;
@@ -60,7 +60,7 @@ public class ReadClusterState implements Runnable, AutoCloseable {
private final AtomicInteger readRetry = new AtomicInteger(0);
private final String assignmentId;
private final ISupervisor iSuper;
- private final ILocalizer localizer;
+ private final AsyncLocalizer localizer;
private final ContainerLauncher launcher;
private final String host;
private final LocalState localState;
http://git-wip-us.apache.org/repos/asf/storm/blob/78cb243c/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Slot.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Slot.java b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Slot.java
index d221b71..6533d15 100644
--- a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Slot.java
+++ b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Slot.java
@@ -43,7 +43,7 @@ import org.apache.storm.generated.LSWorkerHeartbeat;
import org.apache.storm.generated.LocalAssignment;
import org.apache.storm.generated.ProfileAction;
import org.apache.storm.generated.ProfileRequest;
-import org.apache.storm.localizer.ILocalizer;
+import org.apache.storm.localizer.AsyncLocalizer;
import org.apache.storm.metric.StormMetricsRegistry;
import org.apache.storm.scheduler.ISupervisor;
import org.apache.storm.utils.LocalState;
@@ -79,7 +79,7 @@ public class Slot extends Thread implements AutoCloseable {
}
static class StaticState {
- public final ILocalizer localizer;
+ public final AsyncLocalizer localizer;
public final long hbTimeoutMs;
public final long firstHbTimeoutMs;
public final long killSleepMs;
@@ -90,10 +90,10 @@ public class Slot extends Thread implements AutoCloseable {
public final ISupervisor iSupervisor;
public final LocalState localState;
- StaticState(ILocalizer localizer, long hbTimeoutMs, long firstHbTimeoutMs,
- long killSleepMs, long monitorFreqMs,
- ContainerLauncher containerLauncher, String host, int port,
- ISupervisor iSupervisor, LocalState localState) {
+ StaticState(AsyncLocalizer localizer, long hbTimeoutMs, long firstHbTimeoutMs,
+ long killSleepMs, long monitorFreqMs,
+ ContainerLauncher containerLauncher, String host, int port,
+ ISupervisor iSupervisor, LocalState localState) {
this.localizer = localizer;
this.hbTimeoutMs = hbTimeoutMs;
this.firstHbTimeoutMs = firstHbTimeoutMs;
@@ -684,12 +684,12 @@ public class Slot extends Thread implements AutoCloseable {
private volatile DynamicState dynamicState;
private final AtomicReference<Map<Long, LocalAssignment>> cachedCurrentAssignments;
- public Slot(ILocalizer localizer, Map<String, Object> conf,
- ContainerLauncher containerLauncher, String host,
- int port, LocalState localState,
- IStormClusterState clusterState,
- ISupervisor iSupervisor,
- AtomicReference<Map<Long, LocalAssignment>> cachedCurrentAssignments) throws Exception {
+ public Slot(AsyncLocalizer localizer, Map<String, Object> conf,
+ ContainerLauncher containerLauncher, String host,
+ int port, LocalState localState,
+ IStormClusterState clusterState,
+ ISupervisor iSupervisor,
+ AtomicReference<Map<Long, LocalAssignment>> cachedCurrentAssignments) throws Exception {
super("SLOT_"+port);
this.cachedCurrentAssignments = cachedCurrentAssignments;
http://git-wip-us.apache.org/repos/asf/storm/blob/78cb243c/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Supervisor.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Supervisor.java b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Supervisor.java
index 1f8d4c3..08d32f1 100644
--- a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Supervisor.java
+++ b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Supervisor.java
@@ -15,6 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.storm.daemon.supervisor;
import java.io.File;
@@ -25,7 +26,6 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
-import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.atomic.AtomicReference;
@@ -39,19 +39,15 @@ import org.apache.storm.cluster.IStormClusterState;
import org.apache.storm.daemon.DaemonCommon;
import org.apache.storm.daemon.supervisor.timer.SupervisorHealthCheck;
import org.apache.storm.daemon.supervisor.timer.SupervisorHeartbeat;
-import org.apache.storm.daemon.supervisor.timer.UpdateBlobs;
import org.apache.storm.event.EventManager;
import org.apache.storm.event.EventManagerImp;
import org.apache.storm.generated.LocalAssignment;
import org.apache.storm.localizer.AsyncLocalizer;
-import org.apache.storm.localizer.ILocalizer;
-import org.apache.storm.localizer.Localizer;
import org.apache.storm.messaging.IContext;
import org.apache.storm.metric.StormMetricsRegistry;
import org.apache.storm.scheduler.ISupervisor;
import org.apache.storm.utils.ConfigUtils;
import org.apache.storm.utils.ServerConfigUtils;
-import org.apache.storm.utils.ServerUtils;
import org.apache.storm.utils.Utils;
import org.apache.storm.utils.ObjectReader;
import org.apache.storm.utils.LocalState;
@@ -78,8 +74,6 @@ public class Supervisor implements DaemonCommon, AutoCloseable {
private final AtomicReference<Map<Long, LocalAssignment>> currAssignment;
private final StormTimer heartbeatTimer;
private final StormTimer eventTimer;
- private final StormTimer blobUpdateTimer;
- private final Localizer localizer;
private final AsyncLocalizer asyncLocalizer;
private EventManager eventManager;
private ReadClusterState readState;
@@ -110,10 +104,11 @@ public class Supervisor implements DaemonCommon, AutoCloseable {
throw Utils.wrapInRuntime(e);
}
+ this.currAssignment = new AtomicReference<>(new HashMap<>());
+
try {
this.localState = ServerConfigUtils.supervisorState(conf);
- this.localizer = ServerUtils.createLocalizer(conf, ConfigUtils.supervisorLocalDir(conf));
- this.asyncLocalizer = new AsyncLocalizer(conf, this.localizer);
+ this.asyncLocalizer = new AsyncLocalizer(conf, currAssignment, localState.getLocalAssignmentsMap());
} catch (IOException e) {
throw Utils.wrapInRuntime(e);
}
@@ -126,13 +121,9 @@ public class Supervisor implements DaemonCommon, AutoCloseable {
throw Utils.wrapInRuntime(e);
}
- this.currAssignment = new AtomicReference<Map<Long, LocalAssignment>>(new HashMap<Long,LocalAssignment>());
-
this.heartbeatTimer = new StormTimer(null, new DefaultUncaughtExceptionHandler());
this.eventTimer = new StormTimer(null, new DefaultUncaughtExceptionHandler());
-
- this.blobUpdateTimer = new StormTimer("blob-update-timer", new DefaultUncaughtExceptionHandler());
}
public String getId() {
@@ -178,12 +169,8 @@ public class Supervisor implements DaemonCommon, AutoCloseable {
public AtomicReference<Map<Long, LocalAssignment>> getCurrAssignment() {
return currAssignment;
}
-
- public Localizer getLocalizer() {
- return localizer;
- }
- ILocalizer getAsyncLocalizer() {
+ AsyncLocalizer getAsyncLocalizer() {
return asyncLocalizer;
}
@@ -199,8 +186,6 @@ public class Supervisor implements DaemonCommon, AutoCloseable {
String path = ServerConfigUtils.supervisorTmpDir(conf);
FileUtils.cleanDirectory(new File(path));
- Localizer localizer = getLocalizer();
-
SupervisorHeartbeat hb = new SupervisorHeartbeat(conf, this);
hb.run();
// should synchronize supervisor so it doesn't launch anything after being down (optimization)
@@ -209,36 +194,14 @@ public class Supervisor implements DaemonCommon, AutoCloseable {
this.eventManager = new EventManagerImp(false);
this.readState = new ReadClusterState(this);
-
- Set<String> downloadedTopoIds = SupervisorUtils.readDownloadedTopologyIds(conf);
- Map<Integer, LocalAssignment> portToAssignments = localState.getLocalAssignmentsMap();
- if (portToAssignments != null) {
- Map<String, LocalAssignment> assignments = new HashMap<>();
- for (LocalAssignment la : localState.getLocalAssignmentsMap().values()) {
- assignments.put(la.get_topology_id(), la);
- }
- for (String topoId : downloadedTopoIds) {
- LocalAssignment la = assignments.get(topoId);
- if (la != null) {
- SupervisorUtils.addBlobReferences(localizer, topoId, conf, la.get_owner());
- } else {
- LOG.warn("Could not find an owner for topo {}", topoId);
- }
- }
- }
- // do this after adding the references so we don't try to clean things being used
- localizer.startCleaner();
- UpdateBlobs updateBlobsThread = new UpdateBlobs(this);
+ asyncLocalizer.start();
if ((Boolean) conf.get(DaemonConfig.SUPERVISOR_ENABLE)) {
// This isn't strictly necessary, but it doesn't hurt and ensures that the machine stays up
// to date even if callbacks don't all work exactly right
eventTimer.scheduleRecurring(0, 10, new EventManagerPushCallback(readState, eventManager));
- // Blob update thread. Starts with 30 seconds delay, every 30 seconds
- blobUpdateTimer.scheduleRecurring(30, 30, new EventManagerPushCallback(updateBlobsThread, eventManager));
-
// supervisor health check
eventTimer.scheduleRecurring(300, 300, new SupervisorHealthCheck(this));
}
@@ -282,15 +245,13 @@ public class Supervisor implements DaemonCommon, AutoCloseable {
this.active = false;
heartbeatTimer.close();
eventTimer.close();
- blobUpdateTimer.close();
if (eventManager != null) {
eventManager.close();
}
if (readState != null) {
readState.close();
}
- asyncLocalizer.shutdown();
- localizer.shutdown();
+ asyncLocalizer.close();
getStormClusterState().disconnect();
} catch (Exception e) {
LOG.error("Error Shutting down", e);
http://git-wip-us.apache.org/repos/asf/storm/blob/78cb243c/storm-server/src/main/java/org/apache/storm/daemon/supervisor/SupervisorUtils.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/SupervisorUtils.java b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/SupervisorUtils.java
index 09c2b5d..33574c3 100644
--- a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/SupervisorUtils.java
+++ b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/SupervisorUtils.java
@@ -19,9 +19,7 @@ package org.apache.storm.daemon.supervisor;
import org.apache.storm.Config;
import org.apache.storm.generated.LSWorkerHeartbeat;
-import org.apache.storm.generated.LocalAssignment;
import org.apache.storm.localizer.LocalResource;
-import org.apache.storm.localizer.Localizer;
import org.apache.storm.utils.ConfigUtils;
import org.apache.storm.utils.ServerUtils;
import org.apache.storm.utils.Utils;
@@ -33,14 +31,11 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
-import java.net.URLDecoder;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
-import java.util.HashSet;
import java.util.List;
import java.util.Map;
-import java.util.Set;
public class SupervisorUtils {
@@ -95,34 +90,6 @@ public class SupervisorUtils {
return localResourceList;
}
- /**
- * For each of the downloaded topologies, adds references to the blobs that the topologies are using. This is used to reconstruct the
- * cache on restart.
- *
- * @param localizer
- * @param stormId
- * @param conf
- */
- static void addBlobReferences(Localizer localizer, String stormId, Map<String, Object> conf, String user) throws IOException {
- Map<String, Object> topoConf = ConfigUtils.readSupervisorStormConf(conf, stormId);
- Map<String, Map<String, Object>> blobstoreMap = (Map<String, Map<String, Object>>) topoConf.get(Config.TOPOLOGY_BLOBSTORE_MAP);
- String topoName = (String) topoConf.get(Config.TOPOLOGY_NAME);
- List<LocalResource> localresources = SupervisorUtils.blobstoreMapToLocalresources(blobstoreMap);
- if (blobstoreMap != null) {
- localizer.addReferences(localresources, user, topoName);
- }
- }
-
- public static Set<String> readDownloadedTopologyIds(Map<String, Object> conf) throws IOException {
- Set<String> stormIds = new HashSet<>();
- String path = ConfigUtils.supervisorStormDistRoot(conf);
- Collection<String> rets = ConfigUtils.readDirContents(path);
- for (String ret : rets) {
- stormIds.add(URLDecoder.decode(ret));
- }
- return stormIds;
- }
-
public static Collection<String> supervisorWorkerIds(Map<String, Object> conf) {
String workerRoot = ConfigUtils.workerRoot(conf);
return ConfigUtils.readDirContents(workerRoot);
http://git-wip-us.apache.org/repos/asf/storm/blob/78cb243c/storm-server/src/main/java/org/apache/storm/daemon/supervisor/timer/UpdateBlobs.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/timer/UpdateBlobs.java b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/timer/UpdateBlobs.java
deleted file mode 100644
index b5dbf57..0000000
--- a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/timer/UpdateBlobs.java
+++ /dev/null
@@ -1,111 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.daemon.supervisor.timer;
-
-import java.util.HashMap;
-import org.apache.storm.Config;
-import org.apache.storm.daemon.supervisor.Supervisor;
-import org.apache.storm.daemon.supervisor.SupervisorUtils;
-import org.apache.storm.generated.AuthorizationException;
-import org.apache.storm.generated.KeyNotFoundException;
-import org.apache.storm.generated.LocalAssignment;
-import org.apache.storm.localizer.LocalResource;
-import org.apache.storm.localizer.Localizer;
-import org.apache.storm.utils.ConfigUtils;
-import org.apache.storm.utils.Utils;
-import org.apache.storm.utils.NimbusLeaderNotFoundException;
-import org.apache.thrift.transport.TTransportException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.atomic.AtomicReference;
-
-/**
- * downloads all blobs listed in the topology configuration for all topologies assigned to this supervisor, and creates version files with a suffix. The
- * Runnable is intended to be run periodically by a timer, created elsewhere.
- */
-public class UpdateBlobs implements Runnable {
-
- private static final Logger LOG = LoggerFactory.getLogger(UpdateBlobs.class);
-
- private Supervisor supervisor;
-
- public UpdateBlobs(Supervisor supervisor) {
- this.supervisor = supervisor;
- }
-
- @Override
- public void run() {
- try {
- Map<String, Object> conf = supervisor.getConf();
- Set<String> downloadedStormIds = SupervisorUtils.readDownloadedTopologyIds(conf);
- AtomicReference<Map<Long, LocalAssignment>> newAssignment = supervisor.getCurrAssignment();
- Map<String, LocalAssignment> assignedStormIds = new HashMap<>();
- for (LocalAssignment localAssignment : newAssignment.get().values()) {
- assignedStormIds.put(localAssignment.get_topology_id(), localAssignment);
- }
- for (String stormId : downloadedStormIds) {
- LocalAssignment la = assignedStormIds.get(stormId);
- if (la != null) {
- if (la.get_owner() == null) {
- //We got a case where the local assignment is not up to date, no point in going on...
- LOG.warn("The blobs will not be updated for {} until the local assignment is updated...", stormId);
- } else {
- String stormRoot = ConfigUtils.supervisorStormDistRoot(conf, stormId);
- LOG.debug("Checking Blob updates for storm topology id {} With target_dir: {}", stormId, stormRoot);
- updateBlobsForTopology(conf, stormId, supervisor.getLocalizer(), la.get_owner());
- }
- }
- }
- } catch (Exception e) {
- if (Utils.exceptionCauseIsInstanceOf(TTransportException.class, e)) {
- LOG.error("Network error while updating blobs, will retry again later", e);
- } else if (Utils.exceptionCauseIsInstanceOf(NimbusLeaderNotFoundException.class, e)) {
- LOG.error("Nimbus unavailable to update blobs, will retry again later", e);
- } else {
- throw Utils.wrapInRuntime(e);
- }
- }
- }
-
- /**
- * Update each blob listed in the topology configuration if the latest version of the blob has not been downloaded.
- *
- * @param conf
- * @param stormId
- * @param localizer
- * @throws IOException
- */
- private void updateBlobsForTopology(Map<String, Object> conf, String stormId, Localizer localizer, String user) throws IOException {
- Map<String, Object> topoConf = ConfigUtils.readSupervisorStormConf(conf, stormId);
- Map<String, Map<String, Object>> blobstoreMap = (Map<String, Map<String, Object>>) topoConf.get(Config.TOPOLOGY_BLOBSTORE_MAP);
- List<LocalResource> localresources = SupervisorUtils.blobstoreMapToLocalresources(blobstoreMap);
- try {
- localizer.updateBlobs(localresources, user);
- } catch (AuthorizationException authExp) {
- LOG.error("AuthorizationException error", authExp);
- } catch (KeyNotFoundException knf) {
- LOG.error("KeyNotFoundException error", knf);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/78cb243c/storm-server/src/main/java/org/apache/storm/event/EventManagerImp.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/event/EventManagerImp.java b/storm-server/src/main/java/org/apache/storm/event/EventManagerImp.java
index 0a64370..6b9d4f1 100644
--- a/storm-server/src/main/java/org/apache/storm/event/EventManagerImp.java
+++ b/storm-server/src/main/java/org/apache/storm/event/EventManagerImp.java
@@ -73,7 +73,7 @@ public class EventManagerImp implements EventManager {
runner.start();
}
- public void proccessInc() {
+ private void proccessInc() {
processed.incrementAndGet();
}
[5/9] storm git commit: [STORM-2758] fix: logviewer_search page not
found
Posted by ka...@apache.org.
[STORM-2758] fix: logviewer_search page not found
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/ec77f66c
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/ec77f66c
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/ec77f66c
Branch: refs/heads/master
Commit: ec77f66c124e1bdb499b9b80a9778aaa2fa9f9af
Parents: db510ae
Author: Ethan Li <et...@gmail.com>
Authored: Tue Sep 26 13:32:58 2017 -0500
Committer: Ethan Li <et...@gmail.com>
Committed: Tue Sep 26 13:42:57 2017 -0500
----------------------------------------------------------------------
.../java/org/apache/storm/daemon/logviewer/LogviewerServer.java | 3 ++-
1 file changed, 2 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/ec77f66c/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/LogviewerServer.java
----------------------------------------------------------------------
diff --git a/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/LogviewerServer.java b/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/LogviewerServer.java
index 0802015..5bceb21 100644
--- a/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/LogviewerServer.java
+++ b/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/LogviewerServer.java
@@ -55,7 +55,8 @@ import org.slf4j.LoggerFactory;
public class LogviewerServer implements AutoCloseable {
private static final Logger LOG = LoggerFactory.getLogger(LogviewerServer.class);
private static final Meter meterShutdownCalls = StormMetricsRegistry.registerMeter("drpc:num-shutdown-calls");
- public static final String STATIC_RESOURCE_DIRECTORY_PATH = "./public";
+ private static final String stormHome = System.getProperty("storm.home");
+ public static final String STATIC_RESOURCE_DIRECTORY_PATH = stormHome + "/public";
private static Server mkHttpServer(Map<String, Object> conf) {
Integer logviewerHttpPort = (Integer) conf.get(DaemonConfig.LOGVIEWER_PORT);
[3/9] storm git commit: STORM-2084: Refactor localization to combine
files together
Posted by ka...@apache.org.
http://git-wip-us.apache.org/repos/asf/storm/blob/78cb243c/storm-server/src/main/java/org/apache/storm/localizer/AsyncLocalizer.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/localizer/AsyncLocalizer.java b/storm-server/src/main/java/org/apache/storm/localizer/AsyncLocalizer.java
index 204ab7a..913820c 100644
--- a/storm-server/src/main/java/org/apache/storm/localizer/AsyncLocalizer.java
+++ b/storm-server/src/main/java/org/apache/storm/localizer/AsyncLocalizer.java
@@ -8,121 +8,293 @@
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.storm.localizer;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import java.io.BufferedWriter;
import java.io.File;
+import java.io.FileOutputStream;
+import java.io.FileWriter;
+import java.io.FilenameFilter;
import java.io.IOException;
import java.io.OutputStream;
+import java.io.PrintWriter;
import java.net.JarURLConnection;
import java.net.URL;
import java.net.URLDecoder;
import java.nio.file.Files;
import java.nio.file.Path;
+import java.nio.file.Paths;
import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutorService;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
-
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+import org.apache.commons.io.FileUtils;
import org.apache.storm.Config;
-import org.apache.storm.utils.ServerConfigUtils;
+import org.apache.storm.DaemonConfig;
import org.apache.storm.blobstore.BlobStore;
import org.apache.storm.blobstore.ClientBlobStore;
-import org.apache.storm.daemon.Shutdownable;
+import org.apache.storm.blobstore.InputStreamWithMeta;
import org.apache.storm.daemon.supervisor.AdvancedFSOps;
import org.apache.storm.daemon.supervisor.SupervisorUtils;
+import org.apache.storm.generated.AuthorizationException;
+import org.apache.storm.generated.KeyNotFoundException;
import org.apache.storm.generated.LocalAssignment;
import org.apache.storm.generated.StormTopology;
+import org.apache.storm.streams.Pair;
import org.apache.storm.utils.ConfigUtils;
+import org.apache.storm.utils.NimbusLeaderNotFoundException;
+import org.apache.storm.utils.ObjectReader;
+import org.apache.storm.utils.ServerConfigUtils;
import org.apache.storm.utils.ServerUtils;
+import org.apache.storm.utils.ShellUtils;
import org.apache.storm.utils.Utils;
+import org.apache.thrift.transport.TTransportException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import static java.nio.file.StandardCopyOption.ATOMIC_MOVE;
/**
- * This is a wrapper around the Localizer class that provides the desired
- * async interface to Slot.
+ * Downloads and caches blobs locally.
*/
-public class AsyncLocalizer implements ILocalizer, Shutdownable {
- /**
- * A future that has already completed.
- */
- private static class AllDoneFuture implements Future<Void> {
+public class AsyncLocalizer implements AutoCloseable {
+ private static final Logger LOG = LoggerFactory.getLogger(AsyncLocalizer.class);
+ public static final String FILECACHE = "filecache";
+ public static final String USERCACHE = "usercache";
+ // sub directories to store either files or uncompressed archives respectively
+ public static final String FILESDIR = "files";
+ public static final String ARCHIVESDIR = "archives";
- @Override
- public boolean cancel(boolean mayInterruptIfRunning) {
- return false;
+ private static final String TO_UNCOMPRESS = "_tmp_";
+ private static final CompletableFuture<Void> ALL_DONE_FUTURE = new CompletableFuture<>();
+ static {
+ ALL_DONE_FUTURE.complete(null);
+ }
+
+ private static Set<String> readDownloadedTopologyIds(Map<String, Object> conf) throws IOException {
+ Set<String> stormIds = new HashSet<>();
+ String path = ConfigUtils.supervisorStormDistRoot(conf);
+ Collection<String> rets = ConfigUtils.readDirContents(path);
+ for (String ret : rets) {
+ stormIds.add(URLDecoder.decode(ret, "UTF-8"));
}
+ return stormIds;
+ }
- @Override
- public boolean isCancelled() {
- return false;
+ private final AtomicReference<Map<Long, LocalAssignment>> currAssignment;
+ private final boolean isLocalMode;
+ private final Map<String, LocalDownloadedResource> basicPending;
+ private final Map<String, LocalDownloadedResource> blobPending;
+ private final Map<String, Object> conf;
+ private final AdvancedFSOps fsOps;
+ private final boolean symlinksDisabled;
+
+ // track resources - user to resourceSet
+ private final ConcurrentMap<String, LocalizedResourceSet> userRsrc = new ConcurrentHashMap<>();
+
+ private final String localBaseDir;
+
+ private final int blobDownloadRetries;
+ private final ScheduledExecutorService execService;
+
+ // cleanup
+ private long cacheTargetSize;
+ private long cacheCleanupPeriod;
+
+
+ public AsyncLocalizer(Map<String, Object> conf, AtomicReference<Map<Long, LocalAssignment>> currAssignment,
+ Map<Integer, LocalAssignment> portToAssignments) throws IOException {
+ this(conf, ConfigUtils.supervisorLocalDir(conf), AdvancedFSOps.make(conf), currAssignment, portToAssignments);
+ }
+
+ @VisibleForTesting
+ AsyncLocalizer(Map<String, Object> conf, String baseDir, AdvancedFSOps ops,
+ AtomicReference<Map<Long, LocalAssignment>> currAssignment,
+ Map<Integer, LocalAssignment> portToAssignments) throws IOException {
+
+ this.conf = conf;
+ isLocalMode = ConfigUtils.isLocalMode(conf);
+ fsOps = ops;
+ localBaseDir = baseDir;
+ // default cache size 10GB, converted to Bytes
+ cacheTargetSize = ObjectReader.getInt(conf.get(DaemonConfig.SUPERVISOR_LOCALIZER_CACHE_TARGET_SIZE_MB),
+ 10 * 1024).longValue() << 20;
+ // default 30 seconds.
+ cacheCleanupPeriod = ObjectReader.getInt(conf.get(
+ DaemonConfig.SUPERVISOR_LOCALIZER_CACHE_CLEANUP_INTERVAL_MS), 30 * 1000).longValue();
+
+ // if we needed we could make config for update thread pool size
+ int threadPoolSize = ObjectReader.getInt(conf.get(DaemonConfig.SUPERVISOR_BLOBSTORE_DOWNLOAD_THREAD_COUNT), 5);
+ blobDownloadRetries = ObjectReader.getInt(conf.get(
+ DaemonConfig.SUPERVISOR_BLOBSTORE_DOWNLOAD_MAX_RETRIES), 3);
+
+ execService = Executors.newScheduledThreadPool(threadPoolSize,
+ new ThreadFactoryBuilder().setNameFormat("AsyncLocalizer Executor").build());
+ reconstructLocalizedResources();
+
+ symlinksDisabled = (boolean)conf.getOrDefault(Config.DISABLE_SYMLINKS, false);
+ basicPending = new HashMap<>();
+ blobPending = new HashMap<>();
+ this.currAssignment = currAssignment;
+
+ recoverBlobReferences(portToAssignments);
+ }
+
+ /**
+ * For each of the downloaded topologies, adds references to the blobs that the topologies are using. This is used to reconstruct the
+ * cache on restart.
+ * @param topoId the topology id
+ * @param user the User that owns this topology
+ */
+ private void addBlobReferences(String topoId, String user) throws IOException {
+ Map<String, Object> topoConf = ConfigUtils.readSupervisorStormConf(conf, topoId);
+ @SuppressWarnings("unchecked")
+ Map<String, Map<String, Object>> blobstoreMap = (Map<String, Map<String, Object>>) topoConf.get(Config.TOPOLOGY_BLOBSTORE_MAP);
+ String topoName = (String) topoConf.get(Config.TOPOLOGY_NAME);
+ List<LocalResource> localresources = SupervisorUtils.blobstoreMapToLocalresources(blobstoreMap);
+ if (blobstoreMap != null) {
+ addReferences(localresources, user, topoName);
}
+ }
- @Override
- public boolean isDone() {
- return true;
+ /**
+ * Pick up where we left off last time.
+ * @param portToAssignments the current set of assignments for the supervisor.
+ */
+ private void recoverBlobReferences(Map<Integer, LocalAssignment> portToAssignments) throws IOException {
+ Set<String> downloadedTopoIds = readDownloadedTopologyIds(conf);
+ if (portToAssignments != null) {
+ Map<String, LocalAssignment> assignments = new HashMap<>();
+ for (LocalAssignment la : portToAssignments.values()) {
+ assignments.put(la.get_topology_id(), la);
+ }
+ for (String topoId : downloadedTopoIds) {
+ LocalAssignment la = assignments.get(topoId);
+ if (la != null) {
+ addBlobReferences(topoId, la.get_owner());
+ } else {
+ LOG.warn("Could not find an owner for topo {}", topoId);
+ }
+ }
}
+ }
- @Override
- public Void get() {
- return null;
+ /**
+ * Downloads all blobs listed in the topology configuration for all topologies assigned to this supervisor, and creates version files
+ * with a suffix. The runnable is intended to be run periodically by a timer, created elsewhere.
+ */
+ private void updateBlobs() {
+ try {
+ Map<String, String> topoIdToOwner = currAssignment.get().values().stream()
+ .map((la) -> Pair.of(la.get_topology_id(), la.get_owner()))
+ .distinct()
+ .collect(Collectors.toMap((p) -> p.getFirst(), (p) -> p.getSecond()));
+ for (String topoId : readDownloadedTopologyIds(conf)) {
+ String owner = topoIdToOwner.get(topoId);
+ if (owner == null) {
+ //We got a case where the local assignment is not up to date, no point in going on...
+ LOG.warn("The blobs will not be updated for {} until the local assignment is updated...", topoId);
+ } else {
+ String stormRoot = ConfigUtils.supervisorStormDistRoot(conf, topoId);
+ LOG.debug("Checking Blob updates for storm topology id {} With target_dir: {}", topoId, stormRoot);
+ updateBlobsForTopology(conf, topoId, owner);
+ }
+ }
+ } catch (Exception e) {
+ if (Utils.exceptionCauseIsInstanceOf(TTransportException.class, e)) {
+ LOG.error("Network error while updating blobs, will retry again later", e);
+ } else if (Utils.exceptionCauseIsInstanceOf(NimbusLeaderNotFoundException.class, e)) {
+ LOG.error("Nimbus unavailable to update blobs, will retry again later", e);
+ } else {
+ throw Utils.wrapInRuntime(e);
+ }
}
+ }
- @Override
- public Void get(long timeout, TimeUnit unit) {
- return null;
+ /**
+ * Update each blob listed in the topology configuration if the latest version of the blob has not been downloaded.
+ */
+ private void updateBlobsForTopology(Map<String, Object> conf, String stormId, String user)
+ throws IOException {
+ Map<String, Object> topoConf = ConfigUtils.readSupervisorStormConf(conf, stormId);
+ Map<String, Map<String, Object>> blobstoreMap = (Map<String, Map<String, Object>>) topoConf.get(Config.TOPOLOGY_BLOBSTORE_MAP);
+ List<LocalResource> localresources = SupervisorUtils.blobstoreMapToLocalresources(blobstoreMap);
+ try {
+ updateBlobs(localresources, user);
+ } catch (AuthorizationException authExp) {
+ LOG.error("AuthorizationException error", authExp);
+ } catch (KeyNotFoundException knf) {
+ LOG.error("KeyNotFoundException error", knf);
}
+ }
+ /**
+ * Start any background threads needed. This includes updating blobs and cleaning up
+ * unused blobs over the configured size limit.
+ */
+ public void start() {
+ execService.scheduleWithFixedDelay(this::updateBlobs, 30, 30, TimeUnit.SECONDS);
+ execService.scheduleAtFixedRate(this::cleanup, cacheCleanupPeriod, cacheCleanupPeriod, TimeUnit.MILLISECONDS);
}
- private static final Logger LOG = LoggerFactory.getLogger(AsyncLocalizer.class);
+ @Override
+ public void close() throws InterruptedException {
+ if (execService != null) {
+ execService.shutdown();
+ }
+ }
- private final Localizer _localizer;
- private final ExecutorService _execService;
- private final boolean _isLocalMode;
- private final Map<String, Object> _conf;
- private final Map<String, LocalDownloadedResource> _basicPending;
- private final Map<String, LocalDownloadedResource> _blobPending;
- private final AdvancedFSOps _fsOps;
- private final boolean _symlinksDisabled;
-
- private class DownloadBaseBlobsDistributed implements Callable<Void> {
- protected final String _topologyId;
- protected final File _stormRoot;
- protected final LocalAssignment _assignment;
+ //ILocalizer
+ private class DownloadBaseBlobsDistributed implements Supplier<Void> {
+ protected final String topologyId;
+ protected final File stormRoot;
+ protected final LocalAssignment assignment;
protected final String owner;
-
+
public DownloadBaseBlobsDistributed(String topologyId, LocalAssignment assignment) throws IOException {
- _topologyId = topologyId;
- _stormRoot = new File(ConfigUtils.supervisorStormDistRoot(_conf, _topologyId));
- _assignment = assignment;
- owner = assignment.get_owner();
+ this.topologyId = topologyId;
+ stormRoot = new File(ConfigUtils.supervisorStormDistRoot(conf, this.topologyId));
+ this.assignment = assignment;
+ owner = assignment.get_owner();
}
-
+
protected void downloadBaseBlobs(File tmproot) throws Exception {
- String stormJarKey = ConfigUtils.masterStormJarKey(_topologyId);
- String stormCodeKey = ConfigUtils.masterStormCodeKey(_topologyId);
- String topoConfKey = ConfigUtils.masterStormConfKey(_topologyId);
+ String stormJarKey = ConfigUtils.masterStormJarKey(topologyId);
+ String stormCodeKey = ConfigUtils.masterStormCodeKey(topologyId);
+ String topoConfKey = ConfigUtils.masterStormConfKey(topologyId);
String jarPath = ConfigUtils.supervisorStormJarPath(tmproot.getAbsolutePath());
String codePath = ConfigUtils.supervisorStormCodePath(tmproot.getAbsolutePath());
String confPath = ConfigUtils.supervisorStormConfPath(tmproot.getAbsolutePath());
- _fsOps.forceMkdir(tmproot);
- _fsOps.restrictDirectoryPermissions(tmproot);
- ClientBlobStore blobStore = ServerUtils.getClientBlobStoreForSupervisor(_conf);
+ fsOps.forceMkdir(tmproot);
+ fsOps.restrictDirectoryPermissions(tmproot);
+ ClientBlobStore blobStore = ServerUtils.getClientBlobStoreForSupervisor(conf);
try {
ServerUtils.downloadResourcesAsSupervisor(stormJarKey, jarPath, blobStore);
ServerUtils.downloadResourcesAsSupervisor(stormCodeKey, codePath, blobStore);
@@ -132,71 +304,71 @@ public class AsyncLocalizer implements ILocalizer, Shutdownable {
}
ServerUtils.extractDirFromJar(jarPath, ServerConfigUtils.RESOURCES_SUBDIR, tmproot);
}
-
+
@Override
- public Void call() throws Exception {
+ public Void get() {
try {
- if (_fsOps.fileExists(_stormRoot)) {
- if (!_fsOps.supportsAtomicDirectoryMove()) {
- LOG.warn("{} may have partially downloaded blobs, recovering", _topologyId);
- _fsOps.deleteIfExists(_stormRoot);
+ if (fsOps.fileExists(stormRoot)) {
+ if (!fsOps.supportsAtomicDirectoryMove()) {
+ LOG.warn("{} may have partially downloaded blobs, recovering", topologyId);
+ fsOps.deleteIfExists(stormRoot);
} else {
- LOG.warn("{} already downloaded blobs, skipping", _topologyId);
+ LOG.warn("{} already downloaded blobs, skipping", topologyId);
return null;
}
}
boolean deleteAll = true;
- String tmproot = ServerConfigUtils.supervisorTmpDir(_conf) + Utils.FILE_PATH_SEPARATOR + Utils.uuid();
+ String tmproot = ServerConfigUtils.supervisorTmpDir(conf) + Utils.FILE_PATH_SEPARATOR + Utils.uuid();
File tr = new File(tmproot);
try {
downloadBaseBlobs(tr);
- if (_assignment.is_set_total_node_shared()) {
+ if (assignment.is_set_total_node_shared()) {
File sharedMemoryDirTmpLocation = new File(tr, "shared_by_topology");
//We need to create a directory for shared memory to write to (we should not encourage this though)
Path path = sharedMemoryDirTmpLocation.toPath();
Files.createDirectories(path);
}
- _fsOps.moveDirectoryPreferAtomic(tr, _stormRoot);
- _fsOps.setupStormCodeDir(owner, _stormRoot);
- if (_assignment.is_set_total_node_shared()) {
- File sharedMemoryDir = new File(_stormRoot, "shared_by_topology");
- _fsOps.setupWorkerArtifactsDir(owner, sharedMemoryDir);
+ fsOps.moveDirectoryPreferAtomic(tr, stormRoot);
+ fsOps.setupStormCodeDir(owner, stormRoot);
+ if (assignment.is_set_total_node_shared()) {
+ File sharedMemoryDir = new File(stormRoot, "shared_by_topology");
+ fsOps.setupWorkerArtifactsDir(owner, sharedMemoryDir);
}
deleteAll = false;
} finally {
if (deleteAll) {
- LOG.warn("Failed to download basic resources for topology-id {}", _topologyId);
- _fsOps.deleteIfExists(tr);
- _fsOps.deleteIfExists(_stormRoot);
+ LOG.warn("Failed to download basic resources for topology-id {}", topologyId);
+ fsOps.deleteIfExists(tr);
+ fsOps.deleteIfExists(stormRoot);
}
}
return null;
} catch (Exception e) {
LOG.warn("Caught Exception While Downloading (rethrowing)... ", e);
- throw e;
+ throw new RuntimeException(e);
}
}
}
-
+
private class DownloadBaseBlobsLocal extends DownloadBaseBlobsDistributed {
public DownloadBaseBlobsLocal(String topologyId, LocalAssignment assignment) throws IOException {
super(topologyId, assignment);
}
-
+
@Override
protected void downloadBaseBlobs(File tmproot) throws Exception {
- _fsOps.forceMkdir(tmproot);
- String stormCodeKey = ConfigUtils.masterStormCodeKey(_topologyId);
- String topoConfKey = ConfigUtils.masterStormConfKey(_topologyId);
+ fsOps.forceMkdir(tmproot);
+ String stormCodeKey = ConfigUtils.masterStormCodeKey(topologyId);
+ String topoConfKey = ConfigUtils.masterStormConfKey(topologyId);
File codePath = new File(ConfigUtils.supervisorStormCodePath(tmproot.getAbsolutePath()));
File confPath = new File(ConfigUtils.supervisorStormConfPath(tmproot.getAbsolutePath()));
- BlobStore blobStore = ServerUtils.getNimbusBlobStore(_conf, null);
+ BlobStore blobStore = ServerUtils.getNimbusBlobStore(conf, null);
try {
- try (OutputStream codeOutStream = _fsOps.getOutputStream(codePath)){
+ try (OutputStream codeOutStream = fsOps.getOutputStream(codePath)){
blobStore.readBlobTo(stormCodeKey, codeOutStream, null);
}
- try (OutputStream confOutStream = _fsOps.getOutputStream(confPath)) {
+ try (OutputStream confOutStream = fsOps.getOutputStream(confPath)) {
blobStore.readBlobTo(topoConfKey, confOutStream, null);
}
} finally {
@@ -204,7 +376,7 @@ public class AsyncLocalizer implements ILocalizer, Shutdownable {
}
ClassLoader classloader = Thread.currentThread().getContextClassLoader();
- String resourcesJar = AsyncLocalizer.resourcesJar();
+ String resourcesJar = resourcesJar();
URL url = classloader.getResource(ServerConfigUtils.RESOURCES_SUBDIR);
String targetDir = tmproot + Utils.FILE_PATH_SEPARATOR;
@@ -217,26 +389,26 @@ public class AsyncLocalizer implements ILocalizer, Shutdownable {
JarURLConnection urlConnection = (JarURLConnection) url.openConnection();
ServerUtils.extractDirFromJar(urlConnection.getJarFileURL().getFile(), ServerConfigUtils.RESOURCES_SUBDIR, new File(targetDir));
} else {
- _fsOps.copyDirectory(new File(url.getFile()), new File(targetDir, ConfigUtils.RESOURCES_SUBDIR));
+ fsOps.copyDirectory(new File(url.getFile()), new File(targetDir, ConfigUtils.RESOURCES_SUBDIR));
}
}
}
}
-
- private class DownloadBlobs implements Callable<Void> {
- private final String _topologyId;
+
+ private class DownloadBlobs implements Supplier<Void> {
+ private final String topologyId;
private final String topoOwner;
public DownloadBlobs(String topologyId, String topoOwner) {
- _topologyId = topologyId;
+ this.topologyId = topologyId;
this.topoOwner = topoOwner;
}
@Override
- public Void call() throws Exception {
+ public Void get() {
try {
- String stormroot = ConfigUtils.supervisorStormDistRoot(_conf, _topologyId);
- Map<String, Object> topoConf = ConfigUtils.readSupervisorStormConf(_conf, _topologyId);
+ String stormroot = ConfigUtils.supervisorStormDistRoot(conf, topologyId);
+ Map<String, Object> topoConf = ConfigUtils.readSupervisorStormConf(conf, topologyId);
@SuppressWarnings("unchecked")
Map<String, Map<String, Object>> blobstoreMap = (Map<String, Map<String, Object>>) topoConf.get(Config.TOPOLOGY_BLOBSTORE_MAP);
@@ -250,7 +422,7 @@ public class AsyncLocalizer implements ILocalizer, Shutdownable {
}
}
- StormTopology stormCode = ConfigUtils.readSupervisorTopology(_conf, _topologyId, _fsOps);
+ StormTopology stormCode = ConfigUtils.readSupervisorTopology(conf, topologyId, fsOps);
List<String> dependencies = new ArrayList<>();
if (stormCode.is_set_dependency_jars()) {
dependencies.addAll(stormCode.get_dependency_jars());
@@ -263,13 +435,13 @@ public class AsyncLocalizer implements ILocalizer, Shutdownable {
}
if (!localResourceList.isEmpty()) {
- File userDir = _localizer.getLocalUserFileCacheDir(topoOwner);
- if (!_fsOps.fileExists(userDir)) {
- _fsOps.forceMkdir(userDir);
+ File userDir = getLocalUserFileCacheDir(topoOwner);
+ if (!fsOps.fileExists(userDir)) {
+ fsOps.forceMkdir(userDir);
}
- List<LocalizedResource> localizedResources = _localizer.getBlobs(localResourceList, topoOwner, topoName, userDir);
- _fsOps.setupBlobPermissions(userDir, topoOwner);
- if (!_symlinksDisabled) {
+ List<LocalizedResource> localizedResources = getBlobs(localResourceList, topoOwner, topoName, userDir);
+ fsOps.setupBlobPermissions(userDir, topoOwner);
+ if (!symlinksDisabled) {
for (LocalizedResource localizedResource : localizedResources) {
String keyName = localizedResource.getKey();
//The sym link we are pointing to
@@ -287,7 +459,7 @@ public class AsyncLocalizer implements ILocalizer, Shutdownable {
// all things are from dependencies
symlinkName = keyName;
}
- _fsOps.createSymlink(new File(stormroot, symlinkName), rsrcFilePath);
+ fsOps.createSymlink(new File(stormroot, symlinkName), rsrcFilePath);
}
}
}
@@ -295,45 +467,25 @@ public class AsyncLocalizer implements ILocalizer, Shutdownable {
return null;
} catch (Exception e) {
LOG.warn("Caught Exception While Downloading (rethrowing)... ", e);
- throw e;
+ throw new RuntimeException(e);
}
}
}
-
- //Visible for testing
- AsyncLocalizer(Map<String, Object> conf, Localizer localizer, AdvancedFSOps ops) {
- _conf = conf;
- _symlinksDisabled = (boolean)conf.getOrDefault(Config.DISABLE_SYMLINKS, false);
- _isLocalMode = ConfigUtils.isLocalMode(conf);
- _localizer = localizer;
- _execService = Executors.newFixedThreadPool(1,
- new ThreadFactoryBuilder()
- .setNameFormat("Async Localizer")
- .build());
- _basicPending = new HashMap<>();
- _blobPending = new HashMap<>();
- _fsOps = ops;
- }
-
- public AsyncLocalizer(Map<String, Object> conf, Localizer localizer) {
- this(conf, localizer, AdvancedFSOps.make(conf));
- }
- @Override
- public synchronized Future<Void> requestDownloadBaseTopologyBlobs(final LocalAssignment assignment, final int port) throws IOException {
+ public synchronized CompletableFuture<Void> requestDownloadBaseTopologyBlobs(final LocalAssignment assignment, final int port) throws IOException {
final String topologyId = assignment.get_topology_id();
- LocalDownloadedResource localResource = _basicPending.get(topologyId);
+ LocalDownloadedResource localResource = basicPending.get(topologyId);
if (localResource == null) {
- Callable<Void> c;
- if (_isLocalMode) {
- c = new DownloadBaseBlobsLocal(topologyId, assignment);
+ Supplier<Void> supplier;
+ if (isLocalMode) {
+ supplier = new DownloadBaseBlobsLocal(topologyId, assignment);
} else {
- c = new DownloadBaseBlobsDistributed(topologyId, assignment);
+ supplier = new DownloadBaseBlobsDistributed(topologyId, assignment);
}
- localResource = new LocalDownloadedResource(_execService.submit(c));
- _basicPending.put(topologyId, localResource);
+ localResource = new LocalDownloadedResource(CompletableFuture.supplyAsync(supplier, execService));
+ basicPending.put(topologyId, localResource);
}
- Future<Void> ret = localResource.reserve(port, assignment);
+ CompletableFuture<Void> ret = localResource.reserve(port, assignment);
LOG.debug("Reserved basic {} {}", topologyId, localResource);
return ret;
}
@@ -343,7 +495,7 @@ public class AsyncLocalizer implements ILocalizer, Shutdownable {
if (path == null) {
return null;
}
-
+
for (String jpath : path.split(File.pathSeparator)) {
if (jpath.endsWith(".jar")) {
if (ServerUtils.zipDoesContainDir(jpath, ServerConfigUtils.RESOURCES_SUBDIR)) {
@@ -353,63 +505,67 @@ public class AsyncLocalizer implements ILocalizer, Shutdownable {
}
return null;
}
-
- @Override
+
public synchronized void recoverRunningTopology(LocalAssignment assignment, int port) {
final String topologyId = assignment.get_topology_id();
- LocalDownloadedResource localResource = _basicPending.get(topologyId);
+ LocalDownloadedResource localResource = basicPending.get(topologyId);
if (localResource == null) {
- localResource = new LocalDownloadedResource(new AllDoneFuture());
- _basicPending.put(topologyId, localResource);
+ localResource = new LocalDownloadedResource(ALL_DONE_FUTURE);
+ basicPending.put(topologyId, localResource);
}
localResource.reserve(port, assignment);
LOG.debug("Recovered basic {} {}", topologyId, localResource);
-
- localResource = _blobPending.get(topologyId);
+
+ localResource = blobPending.get(topologyId);
if (localResource == null) {
- localResource = new LocalDownloadedResource(new AllDoneFuture());
- _blobPending.put(topologyId, localResource);
+ localResource = new LocalDownloadedResource(ALL_DONE_FUTURE);
+ blobPending.put(topologyId, localResource);
}
localResource.reserve(port, assignment);
LOG.debug("Recovered blobs {} {}", topologyId, localResource);
}
-
- @Override
- public synchronized Future<Void> requestDownloadTopologyBlobs(LocalAssignment assignment, int port) {
+
+ public synchronized CompletableFuture<Void> requestDownloadTopologyBlobs(LocalAssignment assignment, int port) {
final String topologyId = assignment.get_topology_id();
- LocalDownloadedResource localResource = _blobPending.get(topologyId);
+ LocalDownloadedResource localResource = blobPending.get(topologyId);
if (localResource == null) {
- Callable<Void> c = new DownloadBlobs(topologyId, assignment.get_owner());
- localResource = new LocalDownloadedResource(_execService.submit(c));
- _blobPending.put(topologyId, localResource);
+ Supplier<Void> supplier = new DownloadBlobs(topologyId, assignment.get_owner());
+ localResource = new LocalDownloadedResource(CompletableFuture.supplyAsync(supplier, execService));
+ blobPending.put(topologyId, localResource);
}
- Future<Void> ret = localResource.reserve(port, assignment);
+ CompletableFuture<Void> ret = localResource.reserve(port, assignment);
LOG.debug("Reserved blobs {} {}", topologyId, localResource);
return ret;
}
- @Override
+ /**
+ * Remove this assignment/port as blocking resources from being cleaned up.
+ *
+ * @param assignment the assignment the resources are for
+ * @param port the port the topology is running on
+ * @throws IOException on any error
+ */
public synchronized void releaseSlotFor(LocalAssignment assignment, int port) throws IOException {
final String topologyId = assignment.get_topology_id();
LOG.debug("Releasing slot for {} {}", topologyId, port);
- LocalDownloadedResource localResource = _blobPending.get(topologyId);
+ LocalDownloadedResource localResource = blobPending.get(topologyId);
if (localResource == null || !localResource.release(port, assignment)) {
LOG.warn("Released blob reference {} {} for something that we didn't have {}", topologyId, port, localResource);
} else if (localResource.isDone()){
LOG.info("Released blob reference {} {} Cleaning up BLOB references...", topologyId, port);
- _blobPending.remove(topologyId);
- Map<String, Object> topoConf = ConfigUtils.readSupervisorStormConf(_conf, topologyId);
+ blobPending.remove(topologyId);
+ Map<String, Object> topoConf = ConfigUtils.readSupervisorStormConf(conf, topologyId);
@SuppressWarnings("unchecked")
Map<String, Map<String, Object>> blobstoreMap = (Map<String, Map<String, Object>>) topoConf.get(Config.TOPOLOGY_BLOBSTORE_MAP);
if (blobstoreMap != null) {
String user = assignment.get_owner();
String topoName = (String) topoConf.get(Config.TOPOLOGY_NAME);
-
+
for (Map.Entry<String, Map<String, Object>> entry : blobstoreMap.entrySet()) {
String key = entry.getKey();
Map<String, Object> blobInfo = entry.getValue();
try {
- _localizer.removeBlobReference(key, user, topoName, SupervisorUtils.shouldUncompressBlob(blobInfo));
+ removeBlobReference(key, user, topoName, SupervisorUtils.shouldUncompressBlob(blobInfo));
} catch (Exception e) {
throw new IOException(e);
}
@@ -418,37 +574,585 @@ public class AsyncLocalizer implements ILocalizer, Shutdownable {
} else {
LOG.debug("Released blob reference {} {} still waiting on {}", topologyId, port, localResource);
}
-
- localResource = _basicPending.get(topologyId);
+
+ localResource = basicPending.get(topologyId);
if (localResource == null || !localResource.release(port, assignment)) {
LOG.warn("Released basic reference {} {} for something that we didn't have {}", topologyId, port, localResource);
} else if (localResource.isDone()){
LOG.info("Released blob reference {} {} Cleaning up basic files...", topologyId, port);
- _basicPending.remove(topologyId);
- String path = ConfigUtils.supervisorStormDistRoot(_conf, topologyId);
- _fsOps.deleteIfExists(new File(path), null, "rmr "+topologyId);
+ basicPending.remove(topologyId);
+ String path = ConfigUtils.supervisorStormDistRoot(conf, topologyId);
+ fsOps.deleteIfExists(new File(path), null, "rmr "+topologyId);
} else {
LOG.debug("Released basic reference {} {} still waiting on {}", topologyId, port, localResource);
}
}
- @Override
public synchronized void cleanupUnusedTopologies() throws IOException {
- File distRoot = new File(ConfigUtils.supervisorStormDistRoot(_conf));
+ File distRoot = new File(ConfigUtils.supervisorStormDistRoot(conf));
LOG.info("Cleaning up unused topologies in {}", distRoot);
File[] children = distRoot.listFiles();
if (children != null) {
for (File topoDir : children) {
String topoId = URLDecoder.decode(topoDir.getName(), "UTF-8");
- if (_basicPending.get(topoId) == null && _blobPending.get(topoId) == null) {
- _fsOps.deleteIfExists(topoDir, null, "rmr " + topoId);
+ if (basicPending.get(topoId) == null && blobPending.get(topoId) == null) {
+ fsOps.deleteIfExists(topoDir, null, "rmr " + topoId);
}
}
}
}
- @Override
- public void shutdown() {
- _execService.shutdown();
+ //From Localizer
+
+ // For testing, it allows setting size in bytes
+ protected void setTargetCacheSize(long size) {
+ cacheTargetSize = size;
+ }
+
+ // For testing, be careful as it doesn't clone
+ ConcurrentMap<String, LocalizedResourceSet> getUserResources() {
+ return userRsrc;
+ }
+
+ // baseDir/supervisor/usercache/
+ protected File getUserCacheDir() {
+ return new File(localBaseDir, USERCACHE);
+ }
+
+ // baseDir/supervisor/usercache/user1/
+ protected File getLocalUserDir(String userName) {
+ return new File(getUserCacheDir(), userName);
+ }
+
+ // baseDir/supervisor/usercache/user1/filecache
+ public File getLocalUserFileCacheDir(String userName) {
+ return new File(getLocalUserDir(userName), FILECACHE);
+ }
+
+ // baseDir/supervisor/usercache/user1/filecache/files
+ protected File getCacheDirForFiles(File dir) {
+ return new File(dir, FILESDIR);
+ }
+
+ // get the directory to put uncompressed archives in
+ // baseDir/supervisor/usercache/user1/filecache/archives
+ protected File getCacheDirForArchives(File dir) {
+ return new File(dir, ARCHIVESDIR);
+ }
+
+ protected void addLocalizedResourceInDir(String dir, LocalizedResourceSet lrsrcSet,
+ boolean uncompress) {
+ File[] lrsrcs = readCurrentBlobs(dir);
+
+ if (lrsrcs != null) {
+ for (File rsrc : lrsrcs) {
+ LOG.info("add localized in dir found: " + rsrc);
+ /// strip off .suffix
+ String path = rsrc.getPath();
+ int p = path.lastIndexOf('.');
+ if (p > 0) {
+ path = path.substring(0, p);
+ }
+ LOG.debug("local file is: {} path is: {}", rsrc.getPath(), path);
+ LocalizedResource lrsrc = new LocalizedResource(new File(path).getName(), path,
+ uncompress);
+ lrsrcSet.add(lrsrc.getKey(), lrsrc, uncompress);
+ }
+ }
+ }
+
+ // Looks for files in the directory with .current suffix
+ protected File[] readCurrentBlobs(String location) {
+ File dir = new File(location);
+ File[] files = null;
+ if (dir.exists()) {
+ files = dir.listFiles(new FilenameFilter() {
+ @Override
+ public boolean accept(File dir, String name) {
+ return name.toLowerCase().endsWith(ServerUtils.DEFAULT_CURRENT_BLOB_SUFFIX);
+ }
+ });
+ }
+ return files;
+ }
+
+ // Check to see if there are any existing files already localized.
+ protected void reconstructLocalizedResources() {
+ try {
+ LOG.info("Reconstruct localized resource: " + getUserCacheDir().getPath());
+ Collection<File> users = ConfigUtils.readDirFiles(getUserCacheDir().getPath());
+ if (!(users == null || users.isEmpty())) {
+ for (File userDir : users) {
+ String user = userDir.getName();
+ LOG.debug("looking in: {} for user: {}", userDir.getPath(), user);
+ LocalizedResourceSet newSet = new LocalizedResourceSet(user);
+ LocalizedResourceSet lrsrcSet = userRsrc.putIfAbsent(user, newSet);
+ if (lrsrcSet == null) {
+ lrsrcSet = newSet;
+ }
+ addLocalizedResourceInDir(getCacheDirForFiles(getLocalUserFileCacheDir(user)).getPath(),
+ lrsrcSet, false);
+ addLocalizedResourceInDir(
+ getCacheDirForArchives(getLocalUserFileCacheDir(user)).getPath(),
+ lrsrcSet, true);
+ }
+ } else {
+ LOG.warn("No left over resources found for any user during reconstructing of local resources at: {}", getUserCacheDir().getPath());
+ }
+ } catch (Exception e) {
+ LOG.error("ERROR reconstructing localized resources", e);
+ }
+ }
+
+ // ignores invalid user/topo/key
+ public synchronized void removeBlobReference(String key, String user, String topo,
+ boolean uncompress) throws AuthorizationException, KeyNotFoundException {
+ LocalizedResourceSet lrsrcSet = userRsrc.get(user);
+ if (lrsrcSet != null) {
+ LocalizedResource lrsrc = lrsrcSet.get(key, uncompress);
+ if (lrsrc != null) {
+ LOG.debug("removing blob reference to: {} for topo: {}", key, topo);
+ lrsrc.removeReference(topo);
+ } else {
+ LOG.warn("trying to remove non-existent blob, key: " + key + " for user: " + user +
+ " topo: " + topo);
+ }
+ } else {
+ LOG.warn("trying to remove blob for non-existent resource set for user: " + user + " key: "
+ + key + " topo: " + topo);
+ }
+ }
+
+ public synchronized void addReferences(List<LocalResource> localresource, String user,
+ String topo) {
+ LocalizedResourceSet lrsrcSet = userRsrc.get(user);
+ if (lrsrcSet != null) {
+ for (LocalResource blob : localresource) {
+ LocalizedResource lrsrc = lrsrcSet.get(blob.getBlobName(), blob.shouldUncompress());
+ if (lrsrc != null) {
+ lrsrc.addReference(topo);
+ LOG.debug("added reference for topo: {} key: {}", topo, blob);
+ } else {
+ LOG.warn("trying to add reference to non-existent blob, key: " + blob + " topo: " + topo);
+ }
+ }
+ } else {
+ LOG.warn("trying to add reference to non-existent local resource set, " +
+ "user: " + user + " topo: " + topo);
+ }
+ }
+
+ /**
+ * This function either returns the blob in the existing cache or if it doesn't exist in the
+ * cache, it will download the blob and will block until the download is complete.
+ */
+ public LocalizedResource getBlob(LocalResource localResource, String user, String topo,
+ File userFileDir) throws AuthorizationException, KeyNotFoundException, IOException {
+ ArrayList<LocalResource> arr = new ArrayList<LocalResource>();
+ arr.add(localResource);
+ List<LocalizedResource> results = getBlobs(arr, user, topo, userFileDir);
+ if (results.isEmpty() || results.size() != 1) {
+ throw new IOException("Unknown error getting blob: " + localResource + ", for user: " + user +
+ ", topo: " + topo);
+ }
+ return results.get(0);
+ }
+
+ protected boolean isLocalizedResourceDownloaded(LocalizedResource lrsrc) {
+ File rsrcFileCurrent = new File(lrsrc.getCurrentSymlinkPath());
+ File rsrcFileWithVersion = new File(lrsrc.getFilePathWithVersion());
+ File versionFile = new File(lrsrc.getVersionFilePath());
+ return (rsrcFileWithVersion.exists() && rsrcFileCurrent.exists() && versionFile.exists());
+ }
+
+ protected boolean isLocalizedResourceUpToDate(LocalizedResource lrsrc,
+ ClientBlobStore blobstore) throws AuthorizationException, KeyNotFoundException {
+ String localFile = lrsrc.getFilePath();
+ long nimbusBlobVersion = ServerUtils.nimbusVersionOfBlob(lrsrc.getKey(), blobstore);
+ long currentBlobVersion = ServerUtils.localVersionOfBlob(localFile);
+ return (nimbusBlobVersion == currentBlobVersion);
+ }
+
+ protected ClientBlobStore getClientBlobStore() {
+ return ServerUtils.getClientBlobStoreForSupervisor(conf);
+ }
+
+ /**
+ * This function updates blobs on the supervisor. It uses a separate thread pool and runs
+ * asynchronously of the download and delete.
+ */
+ public List<LocalizedResource> updateBlobs(List<LocalResource> localResources,
+ String user) throws AuthorizationException, KeyNotFoundException, IOException {
+ LocalizedResourceSet lrsrcSet = userRsrc.get(user);
+ ArrayList<LocalizedResource> results = new ArrayList<>();
+ ArrayList<Callable<LocalizedResource>> updates = new ArrayList<>();
+
+ if (lrsrcSet == null) {
+ // resource set must have been removed
+ return results;
+ }
+ ClientBlobStore blobstore = null;
+ try {
+ blobstore = getClientBlobStore();
+ for (LocalResource localResource: localResources) {
+ String key = localResource.getBlobName();
+ LocalizedResource lrsrc = lrsrcSet.get(key, localResource.shouldUncompress());
+ if (lrsrc == null) {
+ LOG.warn("blob requested for update doesn't exist: {}", key);
+ continue;
+ } else if ((boolean) conf.getOrDefault(Config.DISABLE_SYMLINKS, false)) {
+ LOG.warn("symlinks are disabled so blobs cannot be downloaded.");
+ continue;
+ } else {
+ // update it if either the version isn't the latest or if any local blob files are missing
+ if (!isLocalizedResourceUpToDate(lrsrc, blobstore) ||
+ !isLocalizedResourceDownloaded(lrsrc)) {
+ LOG.debug("updating blob: {}", key);
+ updates.add(new DownloadBlob(this, conf, key, new File(lrsrc.getFilePath()), user,
+ lrsrc.isUncompressed(), true));
+ }
+ }
+ }
+ } finally {
+ if(blobstore != null) {
+ blobstore.shutdown();
+ }
+ }
+ try {
+ List<Future<LocalizedResource>> futures = execService.invokeAll(updates);
+ for (Future<LocalizedResource> futureRsrc : futures) {
+ try {
+ LocalizedResource lrsrc = futureRsrc.get();
+ // put the resource just in case it was removed at same time by the cleaner
+ LocalizedResourceSet newSet = new LocalizedResourceSet(user);
+ LocalizedResourceSet newlrsrcSet = userRsrc.putIfAbsent(user, newSet);
+ if (newlrsrcSet == null) {
+ newlrsrcSet = newSet;
+ }
+ newlrsrcSet.putIfAbsent(lrsrc.getKey(), lrsrc, lrsrc.isUncompressed());
+ results.add(lrsrc);
+ }
+ catch (ExecutionException e) {
+ LOG.error("Error updating blob: ", e);
+ if (e.getCause() instanceof AuthorizationException) {
+ throw (AuthorizationException)e.getCause();
+ }
+ if (e.getCause() instanceof KeyNotFoundException) {
+ throw (KeyNotFoundException)e.getCause();
+ }
+ }
+ }
+ } catch (RejectedExecutionException re) {
+ LOG.error("Error updating blobs : ", re);
+ } catch (InterruptedException ie) {
+ throw new IOException("Interrupted Exception", ie);
+ }
+ return results;
+ }
+
+ /**
+ * This function either returns the blobs in the existing cache or if they don't exist in the
+ * cache, it downloads them in parallel (up to SUPERVISOR_BLOBSTORE_DOWNLOAD_THREAD_COUNT)
+ * and will block until all of them have been downloaded
+ */
+ public synchronized List<LocalizedResource> getBlobs(List<LocalResource> localResources,
+ String user, String topo, File userFileDir)
+ throws AuthorizationException, KeyNotFoundException, IOException {
+ if ((boolean) conf.getOrDefault(Config.DISABLE_SYMLINKS, false)) {
+ throw new KeyNotFoundException("symlinks are disabled so blobs cannot be downloaded.");
+ }
+ LocalizedResourceSet newSet = new LocalizedResourceSet(user);
+ LocalizedResourceSet lrsrcSet = userRsrc.putIfAbsent(user, newSet);
+ if (lrsrcSet == null) {
+ lrsrcSet = newSet;
+ }
+ ArrayList<LocalizedResource> results = new ArrayList<>();
+ ArrayList<Callable<LocalizedResource>> downloads = new ArrayList<>();
+
+ ClientBlobStore blobstore = null;
+ try {
+ blobstore = getClientBlobStore();
+ for (LocalResource localResource: localResources) {
+ String key = localResource.getBlobName();
+ boolean uncompress = localResource.shouldUncompress();
+ LocalizedResource lrsrc = lrsrcSet.get(key, localResource.shouldUncompress());
+ boolean isUpdate = false;
+ if ((lrsrc != null) && (lrsrc.isUncompressed() == localResource.shouldUncompress()) &&
+ (isLocalizedResourceDownloaded(lrsrc))) {
+ if (isLocalizedResourceUpToDate(lrsrc, blobstore)) {
+ LOG.debug("blob already exists: {}", key);
+ lrsrc.addReference(topo);
+ results.add(lrsrc);
+ continue;
+ }
+ LOG.debug("blob exists but isn't up to date: {}", key);
+ isUpdate = true;
+ }
+
+ // go off to blobstore and get it
+ // assume dir passed in exists and has correct permission
+ LOG.debug("fetching blob: {}", key);
+ File downloadDir = getCacheDirForFiles(userFileDir);
+ File localFile = new File(downloadDir, key);
+ if (uncompress) {
+ // for compressed file, download to archives dir
+ downloadDir = getCacheDirForArchives(userFileDir);
+ localFile = new File(downloadDir, key);
+ }
+ downloadDir.mkdir();
+ downloads.add(new DownloadBlob(this, conf, key, localFile, user, uncompress,
+ isUpdate));
+ }
+ } finally {
+ if(blobstore !=null) {
+ blobstore.shutdown();
+ }
+ }
+ try {
+ List<Future<LocalizedResource>> futures = execService.invokeAll(downloads);
+ for (Future<LocalizedResource> futureRsrc: futures) {
+ LocalizedResource lrsrc = futureRsrc.get();
+ lrsrc.addReference(topo);
+ lrsrcSet.add(lrsrc.getKey(), lrsrc, lrsrc.isUncompressed());
+ results.add(lrsrc);
+ }
+ } catch (ExecutionException e) {
+ if (e.getCause() instanceof AuthorizationException)
+ throw (AuthorizationException)e.getCause();
+ else if (e.getCause() instanceof KeyNotFoundException) {
+ throw (KeyNotFoundException)e.getCause();
+ } else {
+ throw new IOException("Error getting blobs", e);
+ }
+ } catch (RejectedExecutionException re) {
+ throw new IOException("RejectedExecutionException: ", re);
+ } catch (InterruptedException ie) {
+ throw new IOException("Interrupted Exception", ie);
+ }
+ return results;
+ }
+
+ static class DownloadBlob implements Callable<LocalizedResource> {
+
+ private AsyncLocalizer localizer;
+ private Map conf;
+ private String key;
+ private File localFile;
+ private String user;
+ private boolean uncompress;
+ private boolean isUpdate;
+
+ public DownloadBlob(AsyncLocalizer localizer, Map<String, Object> conf, String key, File localFile,
+ String user, boolean uncompress, boolean update) {
+ this.localizer = localizer;
+ this.conf = conf;
+ this.key = key;
+ this.localFile = localFile;
+ this.user = user;
+ this.uncompress = uncompress;
+ isUpdate = update;
+ }
+
+ @Override
+ public LocalizedResource call()
+ throws AuthorizationException, KeyNotFoundException, IOException {
+ return localizer.downloadBlob(conf, key, localFile, user, uncompress,
+ isUpdate);
+ }
+ }
+
+ private LocalizedResource downloadBlob(Map<String, Object> conf, String key, File localFile,
+ String user, boolean uncompress, boolean isUpdate)
+ throws AuthorizationException, KeyNotFoundException, IOException {
+ ClientBlobStore blobstore = null;
+ try {
+ blobstore = getClientBlobStore();
+ long nimbusBlobVersion = ServerUtils.nimbusVersionOfBlob(key, blobstore);
+ long oldVersion = ServerUtils.localVersionOfBlob(localFile.toString());
+ FileOutputStream out = null;
+ PrintWriter writer = null;
+ int numTries = 0;
+ String localizedPath = localFile.toString();
+ String localFileWithVersion = ServerUtils.constructBlobWithVersionFileName(localFile.toString(),
+ nimbusBlobVersion);
+ String localVersionFile = ServerUtils.constructVersionFileName(localFile.toString());
+ String downloadFile = localFileWithVersion;
+ if (uncompress) {
+ // we need to download to temp file and then unpack into the one requested
+ downloadFile = new File(localFile.getParent(), TO_UNCOMPRESS + localFile.getName()).toString();
+ }
+ while (numTries < blobDownloadRetries) {
+ out = new FileOutputStream(downloadFile);
+ numTries++;
+ try {
+ if (!ServerUtils.canUserReadBlob(blobstore.getBlobMeta(key), user, conf)) {
+ throw new AuthorizationException(user + " does not have READ access to " + key);
+ }
+ InputStreamWithMeta in = blobstore.getBlob(key);
+ byte[] buffer = new byte[1024];
+ int len;
+ while ((len = in.read(buffer)) >= 0) {
+ out.write(buffer, 0, len);
+ }
+ out.close();
+ in.close();
+ if (uncompress) {
+ ServerUtils.unpack(new File(downloadFile), new File(localFileWithVersion));
+ LOG.debug("uncompressed " + downloadFile + " to: " + localFileWithVersion);
+ }
+
+ // Next write the version.
+ LOG.info("Blob: " + key + " updated with new Nimbus-provided version: " +
+ nimbusBlobVersion + " local version was: " + oldVersion);
+ // The false parameter ensures overwriting the version file, not appending
+ writer = new PrintWriter(
+ new BufferedWriter(new FileWriter(localVersionFile, false)));
+ writer.println(nimbusBlobVersion);
+ writer.close();
+
+ try {
+ setBlobPermissions(conf, user, localFileWithVersion);
+ setBlobPermissions(conf, user, localVersionFile);
+
+ // Update the key.current symlink. First create tmp symlink and do
+ // move of tmp to current so that the operation is atomic.
+ String tmp_uuid_local = java.util.UUID.randomUUID().toString();
+ LOG.debug("Creating a symlink @" + localFile + "." + tmp_uuid_local + " , " +
+ "linking to: " + localFile + "." + nimbusBlobVersion);
+ File uuid_symlink = new File(localFile + "." + tmp_uuid_local);
+
+ Files.createSymbolicLink(uuid_symlink.toPath(),
+ Paths.get(ServerUtils.constructBlobWithVersionFileName(localFile.toString(),
+ nimbusBlobVersion)));
+ File current_symlink = new File(ServerUtils.constructBlobCurrentSymlinkName(
+ localFile.toString()));
+ Files.move(uuid_symlink.toPath(), current_symlink.toPath(), ATOMIC_MOVE);
+ } catch (IOException e) {
+ // if we fail after writing the version file but before we move current link we need to
+ // restore the old version to the file
+ try {
+ PrintWriter restoreWriter = new PrintWriter(
+ new BufferedWriter(new FileWriter(localVersionFile, false)));
+ restoreWriter.println(oldVersion);
+ restoreWriter.close();
+ } catch (IOException ignore) {}
+ throw e;
+ }
+
+ String oldBlobFile = localFile + "." + oldVersion;
+ try {
+ // Remove the old version. Note that if a number of processes have that file open,
+ // the OS will keep the old blob file around until they all close the handle and only
+ // then deletes it. No new process will open the old blob, since the users will open the
+ // blob through the "blob.current" symlink, which always points to the latest version of
+ // a blob. Remove the old version after the current symlink is updated as to not affect
+ // anyone trying to read it.
+ if ((oldVersion != -1) && (oldVersion != nimbusBlobVersion)) {
+ LOG.info("Removing an old blob file:" + oldBlobFile);
+ Files.delete(Paths.get(oldBlobFile));
+ }
+ } catch (IOException e) {
+ // At this point we have downloaded everything and moved symlinks. If the remove of
+ // old fails just log an error
+ LOG.error("Exception removing old blob version: " + oldBlobFile);
+ }
+
+ break;
+ } catch (AuthorizationException ae) {
+ // we consider this non-retriable exceptions
+ if (out != null) {
+ out.close();
+ }
+ new File(downloadFile).delete();
+ throw ae;
+ } catch (IOException | KeyNotFoundException e) {
+ if (out != null) {
+ out.close();
+ }
+ if (writer != null) {
+ writer.close();
+ }
+ new File(downloadFile).delete();
+ if (uncompress) {
+ try {
+ FileUtils.deleteDirectory(new File(localFileWithVersion));
+ } catch (IOException ignore) {}
+ }
+ if (!isUpdate) {
+ // don't want to remove existing version file if its an update
+ new File(localVersionFile).delete();
+ }
+
+ if (numTries < blobDownloadRetries) {
+ LOG.error("Failed to download blob, retrying", e);
+ } else {
+ throw e;
+ }
+ }
+ }
+ return new LocalizedResource(key, localizedPath, uncompress);
+ } finally {
+ if(blobstore != null) {
+ blobstore.shutdown();
+ }
+ }
+ }
+
+ public void setBlobPermissions(Map<String, Object> conf, String user, String path)
+ throws IOException {
+
+ if (!ObjectReader.getBoolean(conf.get(Config.SUPERVISOR_RUN_WORKER_AS_USER), false)) {
+ return;
+ }
+ String wlCommand = ObjectReader.getString(conf.get(Config.SUPERVISOR_WORKER_LAUNCHER), "");
+ if (wlCommand.isEmpty()) {
+ String stormHome = System.getProperty("storm.home");
+ wlCommand = stormHome + "/bin/worker-launcher";
+ }
+ List<String> command = new ArrayList<String>(Arrays.asList(wlCommand, user, "blob", path));
+
+ String[] commandArray = command.toArray(new String[command.size()]);
+ ShellUtils.ShellCommandExecutor shExec = new ShellUtils.ShellCommandExecutor(commandArray);
+ LOG.info("Setting blob permissions, command: {}", Arrays.toString(commandArray));
+
+ try {
+ shExec.execute();
+ LOG.debug("output: {}", shExec.getOutput());
+ } catch (ShellUtils.ExitCodeException e) {
+ int exitCode = shExec.getExitCode();
+ LOG.warn("Exit code from worker-launcher is : " + exitCode, e);
+ LOG.debug("output: {}", shExec.getOutput());
+ throw new IOException("Setting blob permissions failed" +
+ " (exitCode=" + exitCode + ") with output: " + shExec.getOutput(), e);
+ }
}
+
+
+ public synchronized void cleanup() {
+ LocalizedResourceRetentionSet toClean = new LocalizedResourceRetentionSet(cacheTargetSize);
+ // need one large set of all and then clean via LRU
+ for (LocalizedResourceSet t : userRsrc.values()) {
+ toClean.addResources(t);
+ LOG.debug("Resources to be cleaned after adding {} : {}", t.getUser(), toClean);
+ }
+ toClean.cleanup();
+ LOG.debug("Resource cleanup: {}", toClean);
+ for (LocalizedResourceSet t : userRsrc.values()) {
+ if (t.getSize() == 0) {
+ String user = t.getUser();
+
+ LOG.debug("removing empty set: {}", user);
+ File userFileCacheDir = getLocalUserFileCacheDir(user);
+ getCacheDirForFiles(userFileCacheDir).delete();
+ getCacheDirForArchives(userFileCacheDir).delete();
+ getLocalUserFileCacheDir(user).delete();
+ boolean dirsRemoved = getLocalUserDir(user).delete();
+ // to catch race with update thread
+ if (dirsRemoved) {
+ userRsrc.remove(user);
+ }
+ }
+ }
+ }
+
}
http://git-wip-us.apache.org/repos/asf/storm/blob/78cb243c/storm-server/src/main/java/org/apache/storm/localizer/ILocalizer.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/localizer/ILocalizer.java b/storm-server/src/main/java/org/apache/storm/localizer/ILocalizer.java
deleted file mode 100644
index 7105095..0000000
--- a/storm-server/src/main/java/org/apache/storm/localizer/ILocalizer.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.localizer;
-
-import java.io.IOException;
-import java.util.concurrent.Future;
-
-import org.apache.storm.generated.LocalAssignment;
-
-/**
- * Download blobs from the blob store and keep them up to date.
- */
-public interface ILocalizer {
-
- /**
- * Recover a running topology by incrementing references for what it has already downloaded.
- * @param assignment the assignment the resources are for
- * @param port the port the topology is running in.
- */
- void recoverRunningTopology(LocalAssignment assignemnt, int port);
-
- /**
- * Download storm.jar, storm.conf, and storm.ser for this topology if not done so already,
- * and inc a reference count on them.
- * @param assignment the assignment the resources are for
- * @param port the port the topology is running on
- * @return a future to let you know when they are done.
- * @throws IOException on error
- */
- Future<Void> requestDownloadBaseTopologyBlobs(LocalAssignment assignment, int port) throws IOException;
-
- /**
- * Download the blobs for this topology (reading in list in from the config)
- * and inc reference count for these blobs.
- * PRECONDITION: requestDownloadBaseTopologyBlobs has completed downloading.
- * @param assignment the assignment the resources are for
- * @param port the port the topology is running on
- * @return a future to let you know when they are done.
- */
- Future<Void> requestDownloadTopologyBlobs(LocalAssignment assignment, int port);
-
- /**
- * dec reference count on all blobs associated with this topology.
- * @param assignment the assignment the resources are for
- * @param port the port the topology is running on
- * @throws IOException on any error
- */
- void releaseSlotFor(LocalAssignment assignment, int port) throws IOException;
-
- /**
- * Clean up any topologies that are not in use right now.
- * @throws IOException on any error.
- */
- void cleanupUnusedTopologies() throws IOException;
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/78cb243c/storm-server/src/main/java/org/apache/storm/localizer/LocalDownloadedResource.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/localizer/LocalDownloadedResource.java b/storm-server/src/main/java/org/apache/storm/localizer/LocalDownloadedResource.java
index 570c951..f019374 100644
--- a/storm-server/src/main/java/org/apache/storm/localizer/LocalDownloadedResource.java
+++ b/storm-server/src/main/java/org/apache/storm/localizer/LocalDownloadedResource.java
@@ -19,6 +19,7 @@ package org.apache.storm.localizer;
import java.util.HashSet;
import java.util.Set;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
@@ -30,39 +31,6 @@ import org.slf4j.LoggerFactory;
public class LocalDownloadedResource {
private static final Logger LOG = LoggerFactory.getLogger(LocalDownloadedResource.class);
- private static class NoCancelFuture<T> implements Future<T> {
- private final Future<T> _wrapped;
-
- public NoCancelFuture(Future<T> wrapped) {
- _wrapped = wrapped;
- }
-
- @Override
- public boolean cancel(boolean mayInterruptIfRunning) {
- //cancel not currently supported
- return false;
- }
-
- @Override
- public boolean isCancelled() {
- return false;
- }
-
- @Override
- public boolean isDone() {
- return _wrapped.isDone();
- }
-
- @Override
- public T get() throws InterruptedException, ExecutionException {
- return _wrapped.get();
- }
-
- @Override
- public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
- return _wrapped.get(timeout, unit);
- }
- }
private static class PortNAssignment {
private final int _port;
private final LocalAssignment _assignment;
@@ -91,13 +59,13 @@ public class LocalDownloadedResource {
return "{"+ _port + " " + _assignment +"}";
}
}
- private final Future<Void> _pending;
+ private final CompletableFuture<Void> _pending;
private final Set<PortNAssignment> _references;
private boolean _isDone;
- public LocalDownloadedResource(Future<Void> pending) {
- _pending = new NoCancelFuture<>(pending);
+ public LocalDownloadedResource(CompletableFuture<Void> pending) {
+ _pending = pending;
_references = new HashSet<>();
_isDone = false;
}
@@ -108,7 +76,7 @@ public class LocalDownloadedResource {
* @param la the assignment this is for
* @return a future that can be used to track it being downloaded.
*/
- public synchronized Future<Void> reserve(int port, LocalAssignment la) {
+ public synchronized CompletableFuture<Void> reserve(int port, LocalAssignment la) {
PortNAssignment pna = new PortNAssignment(port, la);
if (!_references.add(pna)) {
LOG.warn("Resources {} already reserved {} for this topology", pna, _references);
[2/9] storm git commit: STORM-2084: Refactor localization to combine
files together
Posted by ka...@apache.org.
http://git-wip-us.apache.org/repos/asf/storm/blob/78cb243c/storm-server/src/main/java/org/apache/storm/localizer/Localizer.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/localizer/Localizer.java b/storm-server/src/main/java/org/apache/storm/localizer/Localizer.java
deleted file mode 100644
index 353ab56..0000000
--- a/storm-server/src/main/java/org/apache/storm/localizer/Localizer.java
+++ /dev/null
@@ -1,695 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.localizer;
-
-import org.apache.storm.Config;
-import org.apache.storm.DaemonConfig;
-import org.apache.storm.blobstore.ClientBlobStore;
-import org.apache.storm.blobstore.InputStreamWithMeta;
-import org.apache.storm.generated.AuthorizationException;
-import org.apache.storm.generated.KeyNotFoundException;
-import org.apache.storm.utils.ConfigUtils;
-import org.apache.storm.utils.ServerUtils;
-import org.apache.storm.utils.ObjectReader;
-import org.apache.storm.utils.ShellUtils.ExitCodeException;
-import org.apache.storm.utils.ShellUtils.ShellCommandExecutor;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-import org.apache.commons.io.FileUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.BufferedWriter;
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.FileWriter;
-import java.io.FilenameFilter;
-import java.io.IOException;
-import java.io.PrintWriter;
-import java.nio.file.Files;
-import java.nio.file.Paths;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Map;
-import java.util.Collection;
-import java.util.ArrayList;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.RejectedExecutionException;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-
-import static java.nio.file.StandardCopyOption.ATOMIC_MOVE;
-
-/**
- * Class to download and manage files from the blobstore. It uses an LRU cache
- * to determine which files to keep so they can be reused and which files to delete.
- */
-public class Localizer {
- public static final Logger LOG = LoggerFactory.getLogger(Localizer.class);
- public static final String FILECACHE = "filecache";
- public static final String USERCACHE = "usercache";
- // sub directories to store either files or uncompressed archives respectively
- public static final String FILESDIR = "files";
- public static final String ARCHIVESDIR = "archives";
-
- private static final String TO_UNCOMPRESS = "_tmp_";
-
-
-
- private final Map<String, Object> _conf;
- private final int _threadPoolSize;
- // thread pool for initial download
- private final ExecutorService _execService;
- // thread pool for updates
- private final ExecutorService _updateExecService;
- private final int _blobDownloadRetries;
-
- // track resources - user to resourceSet
- private final ConcurrentMap<String, LocalizedResourceSet> _userRsrc = new
- ConcurrentHashMap<String, LocalizedResourceSet>();
-
- private final String _localBaseDir;
-
- // cleanup
- private long _cacheTargetSize;
- private long _cacheCleanupPeriod;
- private ScheduledExecutorService _cacheCleanupService;
-
- public Localizer(Map<String, Object> conf, String baseDir) {
- _conf = conf;
- _localBaseDir = baseDir;
- // default cache size 10GB, converted to Bytes
- _cacheTargetSize = ObjectReader.getInt(_conf.get(DaemonConfig.SUPERVISOR_LOCALIZER_CACHE_TARGET_SIZE_MB),
- 10 * 1024).longValue() << 20;
- // default 10 minutes.
- _cacheCleanupPeriod = ObjectReader.getInt(_conf.get(
- DaemonConfig.SUPERVISOR_LOCALIZER_CACHE_CLEANUP_INTERVAL_MS), 10 * 60 * 1000).longValue();
-
- // if we needed we could make config for update thread pool size
- _threadPoolSize = ObjectReader.getInt(_conf.get(DaemonConfig.SUPERVISOR_BLOBSTORE_DOWNLOAD_THREAD_COUNT), 5);
- _blobDownloadRetries = ObjectReader.getInt(_conf.get(
- DaemonConfig.SUPERVISOR_BLOBSTORE_DOWNLOAD_MAX_RETRIES), 3);
-
- _execService = Executors.newFixedThreadPool(_threadPoolSize);
- _updateExecService = Executors.newFixedThreadPool(_threadPoolSize);
- reconstructLocalizedResources();
- }
-
- // For testing, it allows setting size in bytes
- protected void setTargetCacheSize(long size) {
- _cacheTargetSize = size;
- }
-
- // For testing, be careful as it doesn't clone
- ConcurrentMap<String, LocalizedResourceSet> getUserResources() {
- return _userRsrc;
- }
-
- public void startCleaner() {
- _cacheCleanupService = new ScheduledThreadPoolExecutor(1,
- new ThreadFactoryBuilder()
- .setNameFormat("Localizer Cache Cleanup")
- .build());
-
- _cacheCleanupService.scheduleWithFixedDelay(new Runnable() {
- @Override
- public void run() {
- handleCacheCleanup();
- }
- }, _cacheCleanupPeriod, _cacheCleanupPeriod, TimeUnit.MILLISECONDS);
- }
-
- public void shutdown() {
- if (_cacheCleanupService != null) {
- _cacheCleanupService.shutdown();
- }
- if (_execService != null) {
- _execService.shutdown();
- }
- if (_updateExecService != null) {
- _updateExecService.shutdown();
- }
- }
-
- // baseDir/supervisor/usercache/
- protected File getUserCacheDir() {
- return new File(_localBaseDir, USERCACHE);
- }
-
- // baseDir/supervisor/usercache/user1/
- protected File getLocalUserDir(String userName) {
- return new File(getUserCacheDir(), userName);
- }
-
- // baseDir/supervisor/usercache/user1/filecache
- public File getLocalUserFileCacheDir(String userName) {
- return new File(getLocalUserDir(userName), FILECACHE);
- }
-
- // baseDir/supervisor/usercache/user1/filecache/files
- protected File getCacheDirForFiles(File dir) {
- return new File(dir, FILESDIR);
- }
-
- // get the directory to put uncompressed archives in
- // baseDir/supervisor/usercache/user1/filecache/archives
- protected File getCacheDirForArchives(File dir) {
- return new File(dir, ARCHIVESDIR);
- }
-
- protected void addLocalizedResourceInDir(String dir, LocalizedResourceSet lrsrcSet,
- boolean uncompress) {
- File[] lrsrcs = readCurrentBlobs(dir);
-
- if (lrsrcs != null) {
- for (File rsrc : lrsrcs) {
- LOG.info("add localized in dir found: " + rsrc);
- /// strip off .suffix
- String path = rsrc.getPath();
- int p = path.lastIndexOf('.');
- if (p > 0) {
- path = path.substring(0, p);
- }
- LOG.debug("local file is: {} path is: {}", rsrc.getPath(), path);
- LocalizedResource lrsrc = new LocalizedResource(new File(path).getName(), path,
- uncompress);
- lrsrcSet.add(lrsrc.getKey(), lrsrc, uncompress);
- }
- }
- }
-
- // Looks for files in the directory with .current suffix
- protected File[] readCurrentBlobs(String location) {
- File dir = new File(location);
- File[] files = null;
- if (dir.exists()) {
- files = dir.listFiles(new FilenameFilter() {
- @Override
- public boolean accept(File dir, String name) {
- return name.toLowerCase().endsWith(ServerUtils.DEFAULT_CURRENT_BLOB_SUFFIX);
- }
- });
- }
- return files;
- }
-
- // Check to see if there are any existing files already localized.
- protected void reconstructLocalizedResources() {
- try {
- LOG.info("Reconstruct localized resource: " + getUserCacheDir().getPath());
- Collection<File> users = ConfigUtils.readDirFiles(getUserCacheDir().getPath());
- if (!(users == null || users.isEmpty())) {
- for (File userDir : users) {
- String user = userDir.getName();
- LOG.debug("looking in: {} for user: {}", userDir.getPath(), user);
- LocalizedResourceSet newSet = new LocalizedResourceSet(user);
- LocalizedResourceSet lrsrcSet = _userRsrc.putIfAbsent(user, newSet);
- if (lrsrcSet == null) {
- lrsrcSet = newSet;
- }
- addLocalizedResourceInDir(getCacheDirForFiles(getLocalUserFileCacheDir(user)).getPath(),
- lrsrcSet, false);
- addLocalizedResourceInDir(
- getCacheDirForArchives(getLocalUserFileCacheDir(user)).getPath(),
- lrsrcSet, true);
- }
- } else {
- LOG.warn("No left over resources found for any user during reconstructing of local resources at: {}", getUserCacheDir().getPath());
- }
- } catch (Exception e) {
- LOG.error("ERROR reconstructing localized resources", e);
- }
- }
-
- // ignores invalid user/topo/key
- public synchronized void removeBlobReference(String key, String user, String topo,
- boolean uncompress) throws AuthorizationException, KeyNotFoundException {
- LocalizedResourceSet lrsrcSet = _userRsrc.get(user);
- if (lrsrcSet != null) {
- LocalizedResource lrsrc = lrsrcSet.get(key, uncompress);
- if (lrsrc != null) {
- LOG.debug("removing blob reference to: {} for topo: {}", key, topo);
- lrsrc.removeReference(topo);
- } else {
- LOG.warn("trying to remove non-existent blob, key: " + key + " for user: " + user +
- " topo: " + topo);
- }
- } else {
- LOG.warn("trying to remove blob for non-existent resource set for user: " + user + " key: "
- + key + " topo: " + topo);
- }
- }
-
- public synchronized void addReferences(List<LocalResource> localresource, String user,
- String topo) {
- LocalizedResourceSet lrsrcSet = _userRsrc.get(user);
- if (lrsrcSet != null) {
- for (LocalResource blob : localresource) {
- LocalizedResource lrsrc = lrsrcSet.get(blob.getBlobName(), blob.shouldUncompress());
- if (lrsrc != null) {
- lrsrc.addReference(topo);
- LOG.debug("added reference for topo: {} key: {}", topo, blob);
- } else {
- LOG.warn("trying to add reference to non-existent blob, key: " + blob + " topo: " + topo);
- }
- }
- } else {
- LOG.warn("trying to add reference to non-existent local resource set, " +
- "user: " + user + " topo: " + topo);
- }
- }
-
- /**
- * This function either returns the blob in the existing cache or if it doesn't exist in the
- * cache, it will download the blob and will block until the download is complete.
- */
- public LocalizedResource getBlob(LocalResource localResource, String user, String topo,
- File userFileDir) throws AuthorizationException, KeyNotFoundException, IOException {
- ArrayList<LocalResource> arr = new ArrayList<LocalResource>();
- arr.add(localResource);
- List<LocalizedResource> results = getBlobs(arr, user, topo, userFileDir);
- if (results.isEmpty() || results.size() != 1) {
- throw new IOException("Unknown error getting blob: " + localResource + ", for user: " + user +
- ", topo: " + topo);
- }
- return results.get(0);
- }
-
- protected boolean isLocalizedResourceDownloaded(LocalizedResource lrsrc) {
- File rsrcFileCurrent = new File(lrsrc.getCurrentSymlinkPath());
- File rsrcFileWithVersion = new File(lrsrc.getFilePathWithVersion());
- File versionFile = new File(lrsrc.getVersionFilePath());
- return (rsrcFileWithVersion.exists() && rsrcFileCurrent.exists() && versionFile.exists());
- }
-
- protected boolean isLocalizedResourceUpToDate(LocalizedResource lrsrc,
- ClientBlobStore blobstore) throws AuthorizationException, KeyNotFoundException {
- String localFile = lrsrc.getFilePath();
- long nimbusBlobVersion = ServerUtils.nimbusVersionOfBlob(lrsrc.getKey(), blobstore);
- long currentBlobVersion = ServerUtils.localVersionOfBlob(localFile);
- return (nimbusBlobVersion == currentBlobVersion);
- }
-
- protected ClientBlobStore getClientBlobStore() {
- return ServerUtils.getClientBlobStoreForSupervisor(_conf);
- }
-
- /**
- * This function updates blobs on the supervisor. It uses a separate thread pool and runs
- * asynchronously of the download and delete.
- */
- public List<LocalizedResource> updateBlobs(List<LocalResource> localResources,
- String user) throws AuthorizationException, KeyNotFoundException, IOException {
- LocalizedResourceSet lrsrcSet = _userRsrc.get(user);
- ArrayList<LocalizedResource> results = new ArrayList<>();
- ArrayList<Callable<LocalizedResource>> updates = new ArrayList<>();
-
- if (lrsrcSet == null) {
- // resource set must have been removed
- return results;
- }
- ClientBlobStore blobstore = null;
- try {
- blobstore = getClientBlobStore();
- for (LocalResource localResource: localResources) {
- String key = localResource.getBlobName();
- LocalizedResource lrsrc = lrsrcSet.get(key, localResource.shouldUncompress());
- if (lrsrc == null) {
- LOG.warn("blob requested for update doesn't exist: {}", key);
- continue;
- } else if ((boolean)_conf.getOrDefault(Config.DISABLE_SYMLINKS, false)) {
- LOG.warn("symlinks are disabled so blobs cannot be downloaded.");
- continue;
- } else {
- // update it if either the version isn't the latest or if any local blob files are missing
- if (!isLocalizedResourceUpToDate(lrsrc, blobstore) ||
- !isLocalizedResourceDownloaded(lrsrc)) {
- LOG.debug("updating blob: {}", key);
- updates.add(new DownloadBlob(this, _conf, key, new File(lrsrc.getFilePath()), user,
- lrsrc.isUncompressed(), true));
- }
- }
- }
- } finally {
- if(blobstore != null) {
- blobstore.shutdown();
- }
- }
- try {
- List<Future<LocalizedResource>> futures = _updateExecService.invokeAll(updates);
- for (Future<LocalizedResource> futureRsrc : futures) {
- try {
- LocalizedResource lrsrc = futureRsrc.get();
- // put the resource just in case it was removed at same time by the cleaner
- LocalizedResourceSet newSet = new LocalizedResourceSet(user);
- LocalizedResourceSet newlrsrcSet = _userRsrc.putIfAbsent(user, newSet);
- if (newlrsrcSet == null) {
- newlrsrcSet = newSet;
- }
- newlrsrcSet.putIfAbsent(lrsrc.getKey(), lrsrc, lrsrc.isUncompressed());
- results.add(lrsrc);
- }
- catch (ExecutionException e) {
- LOG.error("Error updating blob: ", e);
- if (e.getCause() instanceof AuthorizationException) {
- throw (AuthorizationException)e.getCause();
- }
- if (e.getCause() instanceof KeyNotFoundException) {
- throw (KeyNotFoundException)e.getCause();
- }
- }
- }
- } catch (RejectedExecutionException re) {
- LOG.error("Error updating blobs : ", re);
- } catch (InterruptedException ie) {
- throw new IOException("Interrupted Exception", ie);
- }
- return results;
- }
-
- /**
- * This function either returns the blobs in the existing cache or if they don't exist in the
- * cache, it downloads them in parallel (up to SUPERVISOR_BLOBSTORE_DOWNLOAD_THREAD_COUNT)
- * and will block until all of them have been downloaded
- */
- public synchronized List<LocalizedResource> getBlobs(List<LocalResource> localResources,
- String user, String topo, File userFileDir)
- throws AuthorizationException, KeyNotFoundException, IOException {
- if ((boolean)_conf.getOrDefault(Config.DISABLE_SYMLINKS, false)) {
- throw new KeyNotFoundException("symlinks are disabled so blobs cannot be downloaded.");
- }
- LocalizedResourceSet newSet = new LocalizedResourceSet(user);
- LocalizedResourceSet lrsrcSet = _userRsrc.putIfAbsent(user, newSet);
- if (lrsrcSet == null) {
- lrsrcSet = newSet;
- }
- ArrayList<LocalizedResource> results = new ArrayList<>();
- ArrayList<Callable<LocalizedResource>> downloads = new ArrayList<>();
-
- ClientBlobStore blobstore = null;
- try {
- blobstore = getClientBlobStore();
- for (LocalResource localResource: localResources) {
- String key = localResource.getBlobName();
- boolean uncompress = localResource.shouldUncompress();
- LocalizedResource lrsrc = lrsrcSet.get(key, localResource.shouldUncompress());
- boolean isUpdate = false;
- if ((lrsrc != null) && (lrsrc.isUncompressed() == localResource.shouldUncompress()) &&
- (isLocalizedResourceDownloaded(lrsrc))) {
- if (isLocalizedResourceUpToDate(lrsrc, blobstore)) {
- LOG.debug("blob already exists: {}", key);
- lrsrc.addReference(topo);
- results.add(lrsrc);
- continue;
- }
- LOG.debug("blob exists but isn't up to date: {}", key);
- isUpdate = true;
- }
-
- // go off to blobstore and get it
- // assume dir passed in exists and has correct permission
- LOG.debug("fetching blob: {}", key);
- File downloadDir = getCacheDirForFiles(userFileDir);
- File localFile = new File(downloadDir, key);
- if (uncompress) {
- // for compressed file, download to archives dir
- downloadDir = getCacheDirForArchives(userFileDir);
- localFile = new File(downloadDir, key);
- }
- downloadDir.mkdir();
- downloads.add(new DownloadBlob(this, _conf, key, localFile, user, uncompress,
- isUpdate));
- }
- } finally {
- if(blobstore !=null) {
- blobstore.shutdown();
- }
- }
- try {
- List<Future<LocalizedResource>> futures = _execService.invokeAll(downloads);
- for (Future<LocalizedResource> futureRsrc: futures) {
- LocalizedResource lrsrc = futureRsrc.get();
- lrsrc.addReference(topo);
- lrsrcSet.add(lrsrc.getKey(), lrsrc, lrsrc.isUncompressed());
- results.add(lrsrc);
- }
- } catch (ExecutionException e) {
- if (e.getCause() instanceof AuthorizationException)
- throw (AuthorizationException)e.getCause();
- else if (e.getCause() instanceof KeyNotFoundException) {
- throw (KeyNotFoundException)e.getCause();
- } else {
- throw new IOException("Error getting blobs", e);
- }
- } catch (RejectedExecutionException re) {
- throw new IOException("RejectedExecutionException: ", re);
- } catch (InterruptedException ie) {
- throw new IOException("Interrupted Exception", ie);
- }
- return results;
- }
-
- static class DownloadBlob implements Callable<LocalizedResource> {
-
- private Localizer _localizer;
- private Map _conf;
- private String _key;
- private File _localFile;
- private String _user;
- private boolean _uncompress;
- private boolean _isUpdate;
-
- public DownloadBlob(Localizer localizer, Map<String, Object> conf, String key, File localFile,
- String user, boolean uncompress, boolean update) {
- _localizer = localizer;
- _conf = conf;
- _key = key;
- _localFile = localFile;
- _user = user;
- _uncompress = uncompress;
- _isUpdate = update;
- }
-
- @Override
- public LocalizedResource call()
- throws AuthorizationException, KeyNotFoundException, IOException {
- return _localizer.downloadBlob(_conf, _key, _localFile, _user, _uncompress,
- _isUpdate);
- }
- }
-
- private LocalizedResource downloadBlob(Map<String, Object> conf, String key, File localFile,
- String user, boolean uncompress, boolean isUpdate)
- throws AuthorizationException, KeyNotFoundException, IOException {
- ClientBlobStore blobstore = null;
- try {
- blobstore = getClientBlobStore();
- long nimbusBlobVersion = ServerUtils.nimbusVersionOfBlob(key, blobstore);
- long oldVersion = ServerUtils.localVersionOfBlob(localFile.toString());
- FileOutputStream out = null;
- PrintWriter writer = null;
- int numTries = 0;
- String localizedPath = localFile.toString();
- String localFileWithVersion = ServerUtils.constructBlobWithVersionFileName(localFile.toString(),
- nimbusBlobVersion);
- String localVersionFile = ServerUtils.constructVersionFileName(localFile.toString());
- String downloadFile = localFileWithVersion;
- if (uncompress) {
- // we need to download to temp file and then unpack into the one requested
- downloadFile = new File(localFile.getParent(), TO_UNCOMPRESS + localFile.getName()).toString();
- }
- while (numTries < _blobDownloadRetries) {
- out = new FileOutputStream(downloadFile);
- numTries++;
- try {
- if (!ServerUtils.canUserReadBlob(blobstore.getBlobMeta(key), user, conf)) {
- throw new AuthorizationException(user + " does not have READ access to " + key);
- }
- InputStreamWithMeta in = blobstore.getBlob(key);
- byte[] buffer = new byte[1024];
- int len;
- while ((len = in.read(buffer)) >= 0) {
- out.write(buffer, 0, len);
- }
- out.close();
- in.close();
- if (uncompress) {
- ServerUtils.unpack(new File(downloadFile), new File(localFileWithVersion));
- LOG.debug("uncompressed " + downloadFile + " to: " + localFileWithVersion);
- }
-
- // Next write the version.
- LOG.info("Blob: " + key + " updated with new Nimbus-provided version: " +
- nimbusBlobVersion + " local version was: " + oldVersion);
- // The false parameter ensures overwriting the version file, not appending
- writer = new PrintWriter(
- new BufferedWriter(new FileWriter(localVersionFile, false)));
- writer.println(nimbusBlobVersion);
- writer.close();
-
- try {
- setBlobPermissions(conf, user, localFileWithVersion);
- setBlobPermissions(conf, user, localVersionFile);
-
- // Update the key.current symlink. First create tmp symlink and do
- // move of tmp to current so that the operation is atomic.
- String tmp_uuid_local = java.util.UUID.randomUUID().toString();
- LOG.debug("Creating a symlink @" + localFile + "." + tmp_uuid_local + " , " +
- "linking to: " + localFile + "." + nimbusBlobVersion);
- File uuid_symlink = new File(localFile + "." + tmp_uuid_local);
-
- Files.createSymbolicLink(uuid_symlink.toPath(),
- Paths.get(ServerUtils.constructBlobWithVersionFileName(localFile.toString(),
- nimbusBlobVersion)));
- File current_symlink = new File(ServerUtils.constructBlobCurrentSymlinkName(
- localFile.toString()));
- Files.move(uuid_symlink.toPath(), current_symlink.toPath(), ATOMIC_MOVE);
- } catch (IOException e) {
- // if we fail after writing the version file but before we move current link we need to
- // restore the old version to the file
- try {
- PrintWriter restoreWriter = new PrintWriter(
- new BufferedWriter(new FileWriter(localVersionFile, false)));
- restoreWriter.println(oldVersion);
- restoreWriter.close();
- } catch (IOException ignore) {}
- throw e;
- }
-
- String oldBlobFile = localFile + "." + oldVersion;
- try {
- // Remove the old version. Note that if a number of processes have that file open,
- // the OS will keep the old blob file around until they all close the handle and only
- // then deletes it. No new process will open the old blob, since the users will open the
- // blob through the "blob.current" symlink, which always points to the latest version of
- // a blob. Remove the old version after the current symlink is updated as to not affect
- // anyone trying to read it.
- if ((oldVersion != -1) && (oldVersion != nimbusBlobVersion)) {
- LOG.info("Removing an old blob file:" + oldBlobFile);
- Files.delete(Paths.get(oldBlobFile));
- }
- } catch (IOException e) {
- // At this point we have downloaded everything and moved symlinks. If the remove of
- // old fails just log an error
- LOG.error("Exception removing old blob version: " + oldBlobFile);
- }
-
- break;
- } catch (AuthorizationException ae) {
- // we consider this non-retriable exceptions
- if (out != null) {
- out.close();
- }
- new File(downloadFile).delete();
- throw ae;
- } catch (IOException | KeyNotFoundException e) {
- if (out != null) {
- out.close();
- }
- if (writer != null) {
- writer.close();
- }
- new File(downloadFile).delete();
- if (uncompress) {
- try {
- FileUtils.deleteDirectory(new File(localFileWithVersion));
- } catch (IOException ignore) {}
- }
- if (!isUpdate) {
- // don't want to remove existing version file if its an update
- new File(localVersionFile).delete();
- }
-
- if (numTries < _blobDownloadRetries) {
- LOG.error("Failed to download blob, retrying", e);
- } else {
- throw e;
- }
- }
- }
- return new LocalizedResource(key, localizedPath, uncompress);
- } finally {
- if(blobstore != null) {
- blobstore.shutdown();
- }
- }
- }
-
- public void setBlobPermissions(Map<String, Object> conf, String user, String path)
- throws IOException {
-
- if (!ObjectReader.getBoolean(conf.get(Config.SUPERVISOR_RUN_WORKER_AS_USER), false)) {
- return;
- }
- String wlCommand = ObjectReader.getString(conf.get(Config.SUPERVISOR_WORKER_LAUNCHER), "");
- if (wlCommand.isEmpty()) {
- String stormHome = System.getProperty("storm.home");
- wlCommand = stormHome + "/bin/worker-launcher";
- }
- List<String> command = new ArrayList<String>(Arrays.asList(wlCommand, user, "blob", path));
-
- String[] commandArray = command.toArray(new String[command.size()]);
- ShellCommandExecutor shExec = new ShellCommandExecutor(commandArray);
- LOG.info("Setting blob permissions, command: {}", Arrays.toString(commandArray));
-
- try {
- shExec.execute();
- LOG.debug("output: {}", shExec.getOutput());
- } catch (ExitCodeException e) {
- int exitCode = shExec.getExitCode();
- LOG.warn("Exit code from worker-launcher is : " + exitCode, e);
- LOG.debug("output: {}", shExec.getOutput());
- throw new IOException("Setting blob permissions failed" +
- " (exitCode=" + exitCode + ") with output: " + shExec.getOutput(), e);
- }
- }
-
-
- public synchronized void handleCacheCleanup() {
- LocalizedResourceRetentionSet toClean = new LocalizedResourceRetentionSet(_cacheTargetSize);
- // need one large set of all and then clean via LRU
- for (LocalizedResourceSet t : _userRsrc.values()) {
- toClean.addResources(t);
- LOG.debug("Resources to be cleaned after adding {} : {}", t.getUser(), toClean);
- }
- toClean.cleanup();
- LOG.debug("Resource cleanup: {}", toClean);
- for (LocalizedResourceSet t : _userRsrc.values()) {
- if (t.getSize() == 0) {
- String user = t.getUser();
-
- LOG.debug("removing empty set: {}", user);
- File userFileCacheDir = getLocalUserFileCacheDir(user);
- getCacheDirForFiles(userFileCacheDir).delete();
- getCacheDirForArchives(userFileCacheDir).delete();
- getLocalUserFileCacheDir(user).delete();
- boolean dirsRemoved = getLocalUserDir(user).delete();
- // to catch race with update thread
- if (dirsRemoved) {
- _userRsrc.remove(user);
- }
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/78cb243c/storm-server/src/main/java/org/apache/storm/utils/ServerUtils.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/utils/ServerUtils.java b/storm-server/src/main/java/org/apache/storm/utils/ServerUtils.java
index 311acda..7b127d2 100644
--- a/storm-server/src/main/java/org/apache/storm/utils/ServerUtils.java
+++ b/storm-server/src/main/java/org/apache/storm/utils/ServerUtils.java
@@ -33,23 +33,26 @@ import org.apache.storm.blobstore.ClientBlobStore;
import org.apache.storm.blobstore.InputStreamWithMeta;
import org.apache.storm.blobstore.LocalFsBlobStore;
import org.apache.storm.daemon.StormCommon;
-import org.apache.storm.generated.*;
-import org.apache.storm.localizer.Localizer;
+import org.apache.storm.generated.AccessControl;
+import org.apache.storm.generated.AccessControlType;
+import org.apache.storm.generated.AuthorizationException;
+import org.apache.storm.generated.InvalidTopologyException;
+import org.apache.storm.generated.KeyNotFoundException;
+import org.apache.storm.generated.ReadableBlobMeta;
+import org.apache.storm.generated.SettableBlobMeta;
+import org.apache.storm.generated.StormTopology;
import org.apache.storm.nimbus.NimbusInfo;
import org.apache.storm.scheduler.resource.ResourceUtils;
import org.apache.thrift.TException;
-import org.apache.storm.utils.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
-import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
-import java.io.FileReader;
import java.io.FileWriter;
import java.io.IOException;
import java.io.InputStream;
@@ -61,7 +64,6 @@ import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Arrays;
-import java.util.Collection;
import java.util.Collections;
import java.util.Enumeration;
import java.util.HashMap;
@@ -285,10 +287,6 @@ public class ServerUtils {
return Files.getOwner(FileSystems.getDefault().getPath(path)).getName();
}
- public static Localizer createLocalizer(Map<String, Object> conf, String baseDir) {
- return new Localizer(conf, baseDir);
- }
-
public static String containerFilePath (String dir) {
return dir + FILE_PATH_SEPARATOR + "launch_container.sh";
}
http://git-wip-us.apache.org/repos/asf/storm/blob/78cb243c/storm-server/src/test/java/org/apache/storm/daemon/supervisor/SlotTest.java
----------------------------------------------------------------------
diff --git a/storm-server/src/test/java/org/apache/storm/daemon/supervisor/SlotTest.java b/storm-server/src/test/java/org/apache/storm/daemon/supervisor/SlotTest.java
index eb25566..60b628e 100644
--- a/storm-server/src/test/java/org/apache/storm/daemon/supervisor/SlotTest.java
+++ b/storm-server/src/test/java/org/apache/storm/daemon/supervisor/SlotTest.java
@@ -25,7 +25,7 @@ import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
-import java.util.concurrent.Future;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import org.apache.storm.daemon.supervisor.Slot.StaticState;
@@ -39,7 +39,7 @@ import org.apache.storm.generated.NodeInfo;
import org.apache.storm.generated.ProfileAction;
import org.apache.storm.generated.ProfileRequest;
import org.apache.storm.generated.WorkerResources;
-import org.apache.storm.localizer.ILocalizer;
+import org.apache.storm.localizer.AsyncLocalizer;
import org.apache.storm.scheduler.ISupervisor;
import org.apache.storm.utils.LocalState;
import org.apache.storm.utils.Time;
@@ -115,7 +115,7 @@ public class SlotTest {
@Test
public void testEmptyToEmpty() throws Exception {
try (SimulatedTime t = new SimulatedTime(1010)){
- ILocalizer localizer = mock(ILocalizer.class);
+ AsyncLocalizer localizer = mock(AsyncLocalizer.class);
LocalState state = mock(LocalState.class);
ContainerLauncher containerLauncher = mock(ContainerLauncher.class);
ISupervisor iSuper = mock(ISupervisor.class);
@@ -137,7 +137,7 @@ public class SlotTest {
LocalAssignment newAssignment =
mkLocalAssignment(topoId, execList, mkWorkerResources(100.0, 100.0, 100.0));
- ILocalizer localizer = mock(ILocalizer.class);
+ AsyncLocalizer localizer = mock(AsyncLocalizer.class);
Container container = mock(Container.class);
LocalState state = mock(LocalState.class);
ContainerLauncher containerLauncher = mock(ContainerLauncher.class);
@@ -146,11 +146,11 @@ public class SlotTest {
when(container.readHeartbeat()).thenReturn(hb, hb);
@SuppressWarnings("unchecked")
- Future<Void> baseFuture = mock(Future.class);
+ CompletableFuture<Void> baseFuture = mock(CompletableFuture.class);
when(localizer.requestDownloadBaseTopologyBlobs(newAssignment, port)).thenReturn(baseFuture);
@SuppressWarnings("unchecked")
- Future<Void> blobFuture = mock(Future.class);
+ CompletableFuture<Void> blobFuture = mock(CompletableFuture.class);
when(localizer.requestDownloadTopologyBlobs(newAssignment, port)).thenReturn(blobFuture);
ISupervisor iSuper = mock(ISupervisor.class);
@@ -220,7 +220,7 @@ public class SlotTest {
LocalAssignment assignment =
mkLocalAssignment(topoId, execList, mkWorkerResources(100.0, 100.0, 100.0));
- ILocalizer localizer = mock(ILocalizer.class);
+ AsyncLocalizer localizer = mock(AsyncLocalizer.class);
Container container = mock(Container.class);
ContainerLauncher containerLauncher = mock(ContainerLauncher.class);
LSWorkerHeartbeat oldhb = mkWorkerHB(topoId, port, execList, Time.currentTimeSecs()-10);
@@ -276,7 +276,7 @@ public class SlotTest {
LocalAssignment nAssignment =
mkLocalAssignment(nTopoId, nExecList, mkWorkerResources(100.0, 100.0, 100.0));
- ILocalizer localizer = mock(ILocalizer.class);
+ AsyncLocalizer localizer = mock(AsyncLocalizer.class);
Container nContainer = mock(Container.class);
LocalState state = mock(LocalState.class);
ContainerLauncher containerLauncher = mock(ContainerLauncher.class);
@@ -285,11 +285,11 @@ public class SlotTest {
when(nContainer.readHeartbeat()).thenReturn(nhb, nhb);
@SuppressWarnings("unchecked")
- Future<Void> baseFuture = mock(Future.class);
+ CompletableFuture<Void> baseFuture = mock(CompletableFuture.class);
when(localizer.requestDownloadBaseTopologyBlobs(nAssignment, port)).thenReturn(baseFuture);
@SuppressWarnings("unchecked")
- Future<Void> blobFuture = mock(Future.class);
+ CompletableFuture<Void> blobFuture = mock(CompletableFuture.class);
when(localizer.requestDownloadTopologyBlobs(nAssignment, port)).thenReturn(blobFuture);
ISupervisor iSuper = mock(ISupervisor.class);
@@ -377,7 +377,7 @@ public class SlotTest {
when(cContainer.readHeartbeat()).thenReturn(chb);
when(cContainer.areAllProcessesDead()).thenReturn(false, true);
- ILocalizer localizer = mock(ILocalizer.class);
+ AsyncLocalizer localizer = mock(AsyncLocalizer.class);
ContainerLauncher containerLauncher = mock(ContainerLauncher.class);
ISupervisor iSuper = mock(ISupervisor.class);
@@ -437,7 +437,7 @@ public class SlotTest {
when(cContainer.readHeartbeat()).thenReturn(chb, chb, chb, chb, chb, chb);
when(cContainer.runProfiling(any(ProfileRequest.class), anyBoolean())).thenReturn(true);
- ILocalizer localizer = mock(ILocalizer.class);
+ AsyncLocalizer localizer = mock(AsyncLocalizer.class);
ContainerLauncher containerLauncher = mock(ContainerLauncher.class);
ISupervisor iSuper = mock(ISupervisor.class);
http://git-wip-us.apache.org/repos/asf/storm/blob/78cb243c/storm-server/src/test/java/org/apache/storm/localizer/AsyncLocalizerTest.java
----------------------------------------------------------------------
diff --git a/storm-server/src/test/java/org/apache/storm/localizer/AsyncLocalizerTest.java b/storm-server/src/test/java/org/apache/storm/localizer/AsyncLocalizerTest.java
index 2461b33..f49be63 100644
--- a/storm-server/src/test/java/org/apache/storm/localizer/AsyncLocalizerTest.java
+++ b/storm-server/src/test/java/org/apache/storm/localizer/AsyncLocalizerTest.java
@@ -15,24 +15,51 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.storm.localizer;
+import static org.apache.storm.localizer.AsyncLocalizer.USERCACHE;
+import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.*;
import static org.mockito.Mockito.*;
+import com.google.common.base.Joiner;
import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.file.Files;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.UUID;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.IOUtils;
import org.apache.storm.DaemonConfig;
+import org.apache.storm.blobstore.BlobStoreAclHandler;
+import org.apache.storm.blobstore.InputStreamWithMeta;
+import org.apache.storm.blobstore.LocalFsBlobStore;
import org.apache.storm.daemon.supervisor.AdvancedFSOps;
+import org.apache.storm.generated.AccessControl;
+import org.apache.storm.generated.AccessControlType;
+import org.apache.storm.generated.AuthorizationException;
+import org.apache.storm.generated.KeyNotFoundException;
+import org.apache.storm.generated.ReadableBlobMeta;
+import org.apache.storm.generated.SettableBlobMeta;
import org.apache.storm.utils.ConfigUtils;
import org.apache.storm.utils.ServerUtils;
import org.apache.storm.utils.ReflectionUtils;
+import org.apache.storm.utils.Utils;
+import org.junit.After;
+import org.junit.Before;
import org.junit.Test;
import org.apache.storm.Config;
@@ -41,9 +68,16 @@ import org.apache.storm.generated.ExecutorInfo;
import org.apache.storm.generated.LocalAssignment;
import org.apache.storm.generated.StormTopology;
import org.apache.storm.security.auth.DefaultPrincipalToLocal;
+import org.mockito.Mockito;
public class AsyncLocalizerTest {
+ private static String getTestLocalizerRoot() {
+ File f = new File("./target/" + Thread.currentThread().getStackTrace()[2].getMethodName() + "/localizer/");
+ f.deleteOnExit();
+ return f.getPath();
+ }
+
@Test
public void testRequestDownloadBaseTopologyBlobs() throws Exception {
final String topoId = "TOPO";
@@ -68,7 +102,6 @@ public class AsyncLocalizerTest {
conf.put(Config.STORM_PRINCIPAL_TO_LOCAL_PLUGIN, DefaultPrincipalToLocal.class.getName());
conf.put(Config.STORM_CLUSTER_MODE, "distributed");
conf.put(Config.STORM_LOCAL_DIR, stormLocal);
- Localizer localizer = mock(Localizer.class);
AdvancedFSOps ops = mock(AdvancedFSOps.class);
ConfigUtils mockedCU = mock(ConfigUtils.class);
ReflectionUtils mockedRU = mock(ReflectionUtils.class);
@@ -76,7 +109,7 @@ public class AsyncLocalizerTest {
Map<String, Object> topoConf = new HashMap<>(conf);
- AsyncLocalizer al = new AsyncLocalizer(conf, localizer, ops);
+ AsyncLocalizer bl = new AsyncLocalizer(conf, getTestLocalizerRoot(), ops, new AtomicReference<>(new HashMap<>()), null);
ConfigUtils orig = ConfigUtils.setInstance(mockedCU);
ReflectionUtils origRU = ReflectionUtils.setInstance(mockedRU);
ServerUtils origUtils = ServerUtils.setInstance(mockedU);
@@ -86,7 +119,7 @@ public class AsyncLocalizerTest {
when(mockedRU.newInstanceImpl(ClientBlobStore.class)).thenReturn(blobStore);
when(mockedCU.readSupervisorStormConfImpl(conf, topoId)).thenReturn(topoConf);
- Future<Void> f = al.requestDownloadBaseTopologyBlobs(la, port);
+ Future<Void> f = bl.requestDownloadBaseTopologyBlobs(la, port);
f.get(20, TimeUnit.SECONDS);
// We should be done now...
@@ -102,7 +135,7 @@ public class AsyncLocalizerTest {
verify(ops, never()).deleteIfExists(any(File.class));
} finally {
- al.shutdown();
+ bl.close();
ConfigUtils.setInstance(orig);
ReflectionUtils.setInstance(origRU);
ServerUtils.setInstance(origUtils);
@@ -129,7 +162,7 @@ public class AsyncLocalizerTest {
final File userDir = new File(stormLocal, user);
final String stormRoot = stormLocal+topoId+"/";
- final String localizerRoot = "/tmp/storm-localizer/";
+ final String localizerRoot = getTestLocalizerRoot();
final String simpleLocalFile = localizerRoot + user + "/simple";
final String simpleCurrentLocalFile = localizerRoot + user + "/simple.current";
@@ -146,7 +179,6 @@ public class AsyncLocalizerTest {
Map<String, Object> conf = new HashMap<>();
conf.put(Config.STORM_LOCAL_DIR, stormLocal);
- Localizer localizer = mock(Localizer.class);
AdvancedFSOps ops = mock(AdvancedFSOps.class);
ConfigUtils mockedCU = mock(ConfigUtils.class);
@@ -157,33 +189,662 @@ public class AsyncLocalizerTest {
List<LocalizedResource> localizedList = new ArrayList<>();
LocalizedResource simpleLocal = new LocalizedResource(simpleKey, simpleLocalFile, false);
localizedList.add(simpleLocal);
-
- AsyncLocalizer al = new AsyncLocalizer(conf, localizer, ops);
+
+ AsyncLocalizer bl = spy(new AsyncLocalizer(conf, localizerRoot, ops, new AtomicReference<>(new HashMap<>()), null));
ConfigUtils orig = ConfigUtils.setInstance(mockedCU);
try {
when(mockedCU.supervisorStormDistRootImpl(conf, topoId)).thenReturn(stormRoot);
when(mockedCU.readSupervisorStormConfImpl(conf, topoId)).thenReturn(topoConf);
when(mockedCU.readSupervisorTopologyImpl(conf, topoId, ops)).thenReturn(st);
-
- when(localizer.getLocalUserFileCacheDir(user)).thenReturn(userDir);
-
- when(localizer.getBlobs(any(List.class), eq(user), eq(topoName), eq(userDir))).thenReturn(localizedList);
-
- Future<Void> f = al.requestDownloadTopologyBlobs(la, port);
+
+ //Write the mocking backwards so the actual method is not called on the spy object
+ doReturn(userDir).when(bl).getLocalUserFileCacheDir(user);
+ doReturn(localizedList).when(bl).getBlobs(any(List.class), eq(user), eq(topoName), eq(userDir));
+
+ Future<Void> f = bl.requestDownloadTopologyBlobs(la, port);
f.get(20, TimeUnit.SECONDS);
// We should be done now...
-
- verify(localizer).getLocalUserFileCacheDir(user);
+
+ verify(bl).getLocalUserFileCacheDir(user);
+
verify(ops).fileExists(userDir);
verify(ops).forceMkdir(userDir);
-
- verify(localizer).getBlobs(any(List.class), eq(user), eq(topoName), eq(userDir));
+
+ verify(bl).getBlobs(any(List.class), eq(user), eq(topoName), eq(userDir));
verify(ops).createSymlink(new File(stormRoot, simpleLocalName), new File(simpleCurrentLocalFile));
} finally {
- al.shutdown();
+ bl.close();
ConfigUtils.setInstance(orig);
}
}
+
+ //From LocalizerTest
+ private File baseDir;
+
+ private final String user1 = "user1";
+ private final String user2 = "user2";
+ private final String user3 = "user3";
+
+ private ClientBlobStore mockblobstore = mock(ClientBlobStore.class);
+
+
+ class TestLocalizer extends AsyncLocalizer {
+
+ TestLocalizer(Map<String, Object> conf, String baseDir) throws IOException {
+ super(conf, baseDir, AdvancedFSOps.make(conf), new AtomicReference<>(new HashMap<>()), null);
+ }
+
+ @Override
+ protected ClientBlobStore getClientBlobStore() {
+ return mockblobstore;
+ }
+ }
+
+ class TestInputStreamWithMeta extends InputStreamWithMeta {
+ private InputStream iostream;
+
+ public TestInputStreamWithMeta() {
+ iostream = IOUtils.toInputStream("some test data for my input stream");
+ }
+
+ public TestInputStreamWithMeta(InputStream istream) {
+ iostream = istream;
+ }
+
+ @Override
+ public long getVersion() throws IOException {
+ return 1;
+ }
+
+ @Override
+ public synchronized int read() {
+ return 0;
+ }
+
+ @Override
+ public synchronized int read(byte[] b)
+ throws IOException {
+ int length = iostream.read(b);
+ if (length == 0) {
+ return -1;
+ }
+ return length;
+ }
+
+ @Override
+ public long getFileLength() {
+ return 0;
+ }
+ };
+
+ @Before
+ public void setUp() throws Exception {
+ baseDir = new File(System.getProperty("java.io.tmpdir") + "/blob-store-localizer-test-"+ UUID.randomUUID());
+ if (!baseDir.mkdir()) {
+ throw new IOException("failed to create base directory");
+ }
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ try {
+ FileUtils.deleteDirectory(baseDir);
+ } catch (IOException ignore) {}
+ }
+
+ protected String joinPath(String... pathList) {
+ return Joiner.on(File.separator).join(pathList);
+ }
+
+ public String constructUserCacheDir(String base, String user) {
+ return joinPath(base, USERCACHE, user);
+ }
+
+ public String constructExpectedFilesDir(String base, String user) {
+ return joinPath(constructUserCacheDir(base, user), AsyncLocalizer.FILECACHE, AsyncLocalizer.FILESDIR);
+ }
+
+ public String constructExpectedArchivesDir(String base, String user) {
+ return joinPath(constructUserCacheDir(base, user), AsyncLocalizer.FILECACHE, AsyncLocalizer.ARCHIVESDIR);
+ }
+
+ @Test
+ public void testDirPaths() throws Exception {
+ Map<String, Object> conf = new HashMap();
+ AsyncLocalizer localizer = new TestLocalizer(conf, baseDir.toString());
+
+ String expectedDir = constructUserCacheDir(baseDir.toString(), user1);
+ assertEquals("get local user dir doesn't return right value",
+ expectedDir, localizer.getLocalUserDir(user1).toString());
+
+ String expectedFileDir = joinPath(expectedDir, AsyncLocalizer.FILECACHE);
+ assertEquals("get local user file dir doesn't return right value",
+ expectedFileDir, localizer.getLocalUserFileCacheDir(user1).toString());
+ }
+
+ @Test
+ public void testReconstruct() throws Exception {
+ Map<String, Object> conf = new HashMap();
+
+ String expectedFileDir1 = constructExpectedFilesDir(baseDir.toString(), user1);
+ String expectedArchiveDir1 = constructExpectedArchivesDir(baseDir.toString(), user1);
+ String expectedFileDir2 = constructExpectedFilesDir(baseDir.toString(), user2);
+ String expectedArchiveDir2 = constructExpectedArchivesDir(baseDir.toString(), user2);
+
+ String key1 = "testfile1.txt";
+ String key2 = "testfile2.txt";
+ String key3 = "testfile3.txt";
+ String key4 = "testfile4.txt";
+
+ String archive1 = "archive1";
+ String archive2 = "archive2";
+
+ File user1file1 = new File(expectedFileDir1, key1 + ServerUtils.DEFAULT_CURRENT_BLOB_SUFFIX);
+ File user1file2 = new File(expectedFileDir1, key2 + ServerUtils.DEFAULT_CURRENT_BLOB_SUFFIX);
+ File user2file3 = new File(expectedFileDir2, key3 + ServerUtils.DEFAULT_CURRENT_BLOB_SUFFIX);
+ File user2file4 = new File(expectedFileDir2, key4 + ServerUtils.DEFAULT_CURRENT_BLOB_SUFFIX);
+
+ File user1archive1 = new File(expectedArchiveDir1, archive1 + ServerUtils.DEFAULT_CURRENT_BLOB_SUFFIX);
+ File user2archive2 = new File(expectedArchiveDir2, archive2 + ServerUtils.DEFAULT_CURRENT_BLOB_SUFFIX);
+ File user1archive1file = new File(user1archive1, "file1");
+ File user2archive2file = new File(user2archive2, "file2");
+
+ // setup some files/dirs to emulate supervisor restart
+ assertTrue("Failed setup filecache dir1", new File(expectedFileDir1).mkdirs());
+ assertTrue("Failed setup filecache dir2", new File(expectedFileDir2).mkdirs());
+ assertTrue("Failed setup file1", user1file1.createNewFile());
+ assertTrue("Failed setup file2", user1file2.createNewFile());
+ assertTrue("Failed setup file3", user2file3.createNewFile());
+ assertTrue("Failed setup file4", user2file4.createNewFile());
+ assertTrue("Failed setup archive dir1", user1archive1.mkdirs());
+ assertTrue("Failed setup archive dir2", user2archive2.mkdirs());
+ assertTrue("Failed setup file in archivedir1", user1archive1file.createNewFile());
+ assertTrue("Failed setup file in archivedir2", user2archive2file.createNewFile());
+
+ AsyncLocalizer localizer = new TestLocalizer(conf, baseDir.toString());
+
+ ArrayList<LocalResource> arrUser1Keys = new ArrayList<LocalResource>();
+ arrUser1Keys.add(new LocalResource(key1, false));
+ arrUser1Keys.add(new LocalResource(archive1, true));
+ localizer.addReferences(arrUser1Keys, user1, "topo1");
+
+ LocalizedResourceSet lrsrcSet = localizer.getUserResources().get(user1);
+ assertEquals("local resource set size wrong", 3, lrsrcSet.getSize());
+ assertEquals("user doesn't match", user1, lrsrcSet.getUser());
+ LocalizedResource key1rsrc = lrsrcSet.get(key1, false);
+ assertNotNull("Local resource doesn't exist but should", key1rsrc);
+ assertEquals("key doesn't match", key1, key1rsrc.getKey());
+ assertEquals("refcount doesn't match", 1, key1rsrc.getRefCount());
+ LocalizedResource key2rsrc = lrsrcSet.get(key2, false);
+ assertNotNull("Local resource doesn't exist but should", key2rsrc);
+ assertEquals("key doesn't match", key2, key2rsrc.getKey());
+ assertEquals("refcount doesn't match", 0, key2rsrc.getRefCount());
+ LocalizedResource archive1rsrc = lrsrcSet.get(archive1, true);
+ assertNotNull("Local resource doesn't exist but should", archive1rsrc);
+ assertEquals("key doesn't match", archive1, archive1rsrc.getKey());
+ assertEquals("refcount doesn't match", 1, archive1rsrc.getRefCount());
+
+ LocalizedResourceSet lrsrcSet2 = localizer.getUserResources().get(user2);
+ assertEquals("local resource set size wrong", 3, lrsrcSet2.getSize());
+ assertEquals("user doesn't match", user2, lrsrcSet2.getUser());
+ LocalizedResource key3rsrc = lrsrcSet2.get(key3, false);
+ assertNotNull("Local resource doesn't exist but should", key3rsrc);
+ assertEquals("key doesn't match", key3, key3rsrc.getKey());
+ assertEquals("refcount doesn't match", 0, key3rsrc.getRefCount());
+ LocalizedResource key4rsrc = lrsrcSet2.get(key4, false);
+ assertNotNull("Local resource doesn't exist but should", key4rsrc);
+ assertEquals("key doesn't match", key4, key4rsrc.getKey());
+ assertEquals("refcount doesn't match", 0, key4rsrc.getRefCount());
+ LocalizedResource archive2rsrc = lrsrcSet2.get(archive2, true);
+ assertNotNull("Local resource doesn't exist but should", archive2rsrc);
+ assertEquals("key doesn't match", archive2, archive2rsrc.getKey());
+ assertEquals("refcount doesn't match", 0, archive2rsrc.getRefCount());
+ }
+
+ @Test
+ public void testArchivesTgz() throws Exception {
+ testArchives(getFileFromResource(joinPath("localizer", "localtestwithsymlink.tgz")), true, 21344);
+ }
+
+ @Test
+ public void testArchivesZip() throws Exception {
+ testArchives(getFileFromResource(joinPath("localizer", "localtest.zip")), false, 21348);
+ }
+
+ @Test
+ public void testArchivesTarGz() throws Exception {
+ testArchives(getFileFromResource(joinPath("localizer", "localtestwithsymlink.tar.gz")), true, 21344);
+ }
+
+ @Test
+ public void testArchivesTar() throws Exception {
+ testArchives(getFileFromResource(joinPath("localizer", "localtestwithsymlink.tar")), true, 21344);
+ }
+
+ @Test
+ public void testArchivesJar() throws Exception {
+ testArchives(getFileFromResource(joinPath("localizer", "localtestwithsymlink.jar")), false, 21416);
+ }
+
+ private File getFileFromResource(String archivePath) {
+ ClassLoader classLoader = getClass().getClassLoader();
+ return new File(classLoader.getResource(archivePath).getFile());
+ }
+
+ // archive passed in must contain symlink named tmptestsymlink if not a zip file
+ public void testArchives(File archiveFile, boolean supportSymlinks, int size) throws Exception {
+ if (Utils.isOnWindows()) {
+ // Windows should set this to false cause symlink in compressed file doesn't work properly.
+ supportSymlinks = false;
+ }
+
+ Map<String, Object> conf = new HashMap();
+ // set clean time really high so doesn't kick in
+ conf.put(DaemonConfig.SUPERVISOR_LOCALIZER_CACHE_CLEANUP_INTERVAL_MS, 60*60*1000);
+
+ String key1 = archiveFile.getName();
+ String topo1 = "topo1";
+ AsyncLocalizer localizer = new TestLocalizer(conf, baseDir.toString());
+ // set really small so will do cleanup
+ localizer.setTargetCacheSize(1);
+
+ ReadableBlobMeta rbm = new ReadableBlobMeta();
+ rbm.set_settable(new SettableBlobMeta(BlobStoreAclHandler.WORLD_EVERYTHING));
+ when(mockblobstore.getBlobMeta(key1)).thenReturn(rbm);
+
+ when(mockblobstore.getBlob(key1)).thenReturn(new TestInputStreamWithMeta(new
+ FileInputStream(archiveFile.getAbsolutePath())));
+
+ long timeBefore = System.nanoTime();
+ File user1Dir = localizer.getLocalUserFileCacheDir(user1);
+ assertTrue("failed to create user dir", user1Dir.mkdirs());
+ LocalizedResource lrsrc = localizer.getBlob(new LocalResource(key1, true), user1, topo1,
+ user1Dir);
+ long timeAfter = System.nanoTime();
+
+ String expectedUserDir = joinPath(baseDir.toString(), USERCACHE, user1);
+ String expectedFileDir = joinPath(expectedUserDir, AsyncLocalizer.FILECACHE, AsyncLocalizer.ARCHIVESDIR);
+ assertTrue("user filecache dir not created", new File(expectedFileDir).exists());
+ File keyFile = new File(expectedFileDir, key1 + ".0");
+ assertTrue("blob not created", keyFile.exists());
+ assertTrue("blob is not uncompressed", keyFile.isDirectory());
+ File symlinkFile = new File(keyFile, "tmptestsymlink");
+
+ if (supportSymlinks) {
+ assertTrue("blob uncompressed doesn't contain symlink", Files.isSymbolicLink(
+ symlinkFile.toPath()));
+ } else {
+ assertTrue("blob symlink file doesn't exist", symlinkFile.exists());
+ }
+
+ LocalizedResourceSet lrsrcSet = localizer.getUserResources().get(user1);
+ assertEquals("local resource set size wrong", 1, lrsrcSet.getSize());
+ assertEquals("user doesn't match", user1, lrsrcSet.getUser());
+ LocalizedResource key1rsrc = lrsrcSet.get(key1, true);
+ assertNotNull("Local resource doesn't exist but should", key1rsrc);
+ assertEquals("key doesn't match", key1, key1rsrc.getKey());
+ assertEquals("refcount doesn't match", 1, key1rsrc.getRefCount());
+ assertEquals("file path doesn't match", keyFile.toString(), key1rsrc.getFilePathWithVersion());
+ assertEquals("size doesn't match", size, key1rsrc.getSize());
+ assertTrue("timestamp not within range", (key1rsrc.getLastAccessTime() >= timeBefore && key1rsrc
+ .getLastAccessTime() <= timeAfter));
+
+ timeBefore = System.nanoTime();
+ localizer.removeBlobReference(lrsrc.getKey(), user1, topo1, true);
+ timeAfter = System.nanoTime();
+
+ lrsrcSet = localizer.getUserResources().get(user1);
+ assertEquals("local resource set size wrong", 1, lrsrcSet.getSize());
+ key1rsrc = lrsrcSet.get(key1, true);
+ assertNotNull("Local resource doesn't exist but should", key1rsrc);
+ assertEquals("refcount doesn't match", 0, key1rsrc.getRefCount());
+ assertTrue("timestamp not within range", (key1rsrc.getLastAccessTime() >= timeBefore && key1rsrc
+ .getLastAccessTime() <= timeAfter));
+
+ // should remove the blob since cache size set really small
+ localizer.cleanup();
+
+ lrsrcSet = localizer.getUserResources().get(user1);
+ assertFalse("blob contents not deleted", symlinkFile.exists());
+ assertFalse("blob not deleted", keyFile.exists());
+ assertFalse("blob file dir not deleted", new File(expectedFileDir).exists());
+ assertFalse("blob dir not deleted", new File(expectedUserDir).exists());
+ assertNull("user set should be null", lrsrcSet);
+
+ }
+
+ @Test
+ public void testBasic() throws Exception {
+ Map<String, Object> conf = new HashMap();
+ // set clean time really high so doesn't kick in
+ conf.put(DaemonConfig.SUPERVISOR_LOCALIZER_CACHE_CLEANUP_INTERVAL_MS, 60*60*1000);
+
+ String key1 = "key1";
+ String topo1 = "topo1";
+ AsyncLocalizer localizer = new TestLocalizer(conf, baseDir.toString());
+ // set really small so will do cleanup
+ localizer.setTargetCacheSize(1);
+
+ ReadableBlobMeta rbm = new ReadableBlobMeta();
+ rbm.set_settable(new SettableBlobMeta(BlobStoreAclHandler.WORLD_EVERYTHING));
+ when(mockblobstore.getBlobMeta(key1)).thenReturn(rbm);
+
+ when(mockblobstore.getBlob(key1)).thenReturn(new TestInputStreamWithMeta());
+
+ long timeBefore = System.nanoTime();
+ File user1Dir = localizer.getLocalUserFileCacheDir(user1);
+ assertTrue("failed to create user dir", user1Dir.mkdirs());
+ LocalizedResource lrsrc = localizer.getBlob(new LocalResource(key1, false), user1, topo1,
+ user1Dir);
+ long timeAfter = System.nanoTime();
+
+ String expectedUserDir = joinPath(baseDir.toString(), USERCACHE, user1);
+ String expectedFileDir = joinPath(expectedUserDir, AsyncLocalizer.FILECACHE, AsyncLocalizer.FILESDIR);
+ assertTrue("user filecache dir not created", new File(expectedFileDir).exists());
+ File keyFile = new File(expectedFileDir, key1);
+ File keyFileCurrentSymlink = new File(expectedFileDir, key1 + ServerUtils.DEFAULT_CURRENT_BLOB_SUFFIX);
+
+ assertTrue("blob not created", keyFileCurrentSymlink.exists());
+
+ LocalizedResourceSet lrsrcSet = localizer.getUserResources().get(user1);
+ assertEquals("local resource set size wrong", 1, lrsrcSet.getSize());
+ assertEquals("user doesn't match", user1, lrsrcSet.getUser());
+ LocalizedResource key1rsrc = lrsrcSet.get(key1, false);
+ assertNotNull("Local resource doesn't exist but should", key1rsrc);
+ assertEquals("key doesn't match", key1, key1rsrc.getKey());
+ assertEquals("refcount doesn't match", 1, key1rsrc.getRefCount());
+ assertEquals("file path doesn't match", keyFile.toString(), key1rsrc.getFilePath());
+ assertEquals("size doesn't match", 34, key1rsrc.getSize());
+ assertTrue("timestamp not within range", (key1rsrc.getLastAccessTime() >= timeBefore && key1rsrc
+ .getLastAccessTime() <= timeAfter));
+
+ timeBefore = System.nanoTime();
+ localizer.removeBlobReference(lrsrc.getKey(), user1, topo1, false);
+ timeAfter = System.nanoTime();
+
+ lrsrcSet = localizer.getUserResources().get(user1);
+ assertEquals("local resource set size wrong", 1, lrsrcSet.getSize());
+ key1rsrc = lrsrcSet.get(key1, false);
+ assertNotNull("Local resource doesn't exist but should", key1rsrc);
+ assertEquals("refcount doesn't match", 0, key1rsrc.getRefCount());
+ assertTrue("timestamp not within range", (key1rsrc.getLastAccessTime() >= timeBefore && key1rsrc
+ .getLastAccessTime() <= timeAfter));
+
+ // should remove the blob since cache size set really small
+ localizer.cleanup();
+
+ lrsrcSet = localizer.getUserResources().get(user1);
+ assertNull("user set should be null", lrsrcSet);
+ assertFalse("blob not deleted", keyFile.exists());
+ assertFalse("blob dir not deleted", new File(expectedFileDir).exists());
+ assertFalse("blob dir not deleted", new File(expectedUserDir).exists());
+ }
+
+ @Test
+ public void testMultipleKeysOneUser() throws Exception {
+ Map<String, Object> conf = new HashMap();
+ // set clean time really high so doesn't kick in
+ conf.put(DaemonConfig.SUPERVISOR_LOCALIZER_CACHE_CLEANUP_INTERVAL_MS, 60*60*1000);
+
+ String key1 = "key1";
+ String topo1 = "topo1";
+ String key2 = "key2";
+ String key3 = "key3";
+ AsyncLocalizer localizer = new TestLocalizer(conf, baseDir.toString());
+ // set to keep 2 blobs (each of size 34)
+ localizer.setTargetCacheSize(68);
+
+ ReadableBlobMeta rbm = new ReadableBlobMeta();
+ rbm.set_settable(new SettableBlobMeta(BlobStoreAclHandler.WORLD_EVERYTHING));
+ when(mockblobstore.getBlobMeta(anyString())).thenReturn(rbm);
+ when(mockblobstore.getBlob(key1)).thenReturn(new TestInputStreamWithMeta());
+ when(mockblobstore.getBlob(key2)).thenReturn(new TestInputStreamWithMeta());
+ when(mockblobstore.getBlob(key3)).thenReturn(new TestInputStreamWithMeta());
+
+ List<LocalResource> keys = Arrays.asList(new LocalResource[]{new LocalResource(key1, false),
+ new LocalResource(key2, false), new LocalResource(key3, false)});
+ File user1Dir = localizer.getLocalUserFileCacheDir(user1);
+ assertTrue("failed to create user dir", user1Dir.mkdirs());
+
+ List<LocalizedResource> lrsrcs = localizer.getBlobs(keys, user1, topo1, user1Dir);
+ LocalizedResource lrsrc = lrsrcs.get(0);
+ LocalizedResource lrsrc2 = lrsrcs.get(1);
+ LocalizedResource lrsrc3 = lrsrcs.get(2);
+
+ String expectedFileDir = joinPath(baseDir.toString(), USERCACHE, user1,
+ AsyncLocalizer.FILECACHE, AsyncLocalizer.FILESDIR);
+ assertTrue("user filecache dir not created", new File(expectedFileDir).exists());
+ File keyFile = new File(expectedFileDir, key1 + ServerUtils.DEFAULT_CURRENT_BLOB_SUFFIX);
+ File keyFile2 = new File(expectedFileDir, key2 + ServerUtils.DEFAULT_CURRENT_BLOB_SUFFIX);
+ File keyFile3 = new File(expectedFileDir, key3 + ServerUtils.DEFAULT_CURRENT_BLOB_SUFFIX);
+
+ assertTrue("blob not created", keyFile.exists());
+ assertTrue("blob not created", keyFile2.exists());
+ assertTrue("blob not created", keyFile3.exists());
+ assertEquals("size doesn't match", 34, keyFile.length());
+ assertEquals("size doesn't match", 34, keyFile2.length());
+ assertEquals("size doesn't match", 34, keyFile3.length());
+ assertEquals("size doesn't match", 34, lrsrc.getSize());
+ assertEquals("size doesn't match", 34, lrsrc3.getSize());
+ assertEquals("size doesn't match", 34, lrsrc2.getSize());
+
+ LocalizedResourceSet lrsrcSet = localizer.getUserResources().get(user1);
+ assertEquals("local resource set size wrong", 3, lrsrcSet.getSize());
+ assertEquals("user doesn't match", user1, lrsrcSet.getUser());
+
+ long timeBefore = System.nanoTime();
+ localizer.removeBlobReference(lrsrc.getKey(), user1, topo1, false);
+ localizer.removeBlobReference(lrsrc2.getKey(), user1, topo1, false);
+ localizer.removeBlobReference(lrsrc3.getKey(), user1, topo1, false);
+ long timeAfter = System.nanoTime();
+
+ // add reference to one and then remove reference again so it has newer timestamp
+ lrsrc = localizer.getBlob(new LocalResource(key1, false), user1, topo1, user1Dir);
+ assertTrue("timestamp not within range", (lrsrc.getLastAccessTime() >= timeBefore && lrsrc
+ .getLastAccessTime() <= timeAfter));
+ localizer.removeBlobReference(lrsrc.getKey(), user1, topo1, false);
+
+ // should remove the second blob first
+ localizer.cleanup();
+
+ lrsrcSet = localizer.getUserResources().get(user1);
+ assertEquals("local resource set size wrong", 2, lrsrcSet.getSize());
+ long end = System.currentTimeMillis() + 100;
+ while ((end - System.currentTimeMillis()) >= 0 && keyFile2.exists()) {
+ Thread.sleep(1);
+ }
+ assertFalse("blob not deleted", keyFile2.exists());
+ assertTrue("blob deleted", keyFile.exists());
+ assertTrue("blob deleted", keyFile3.exists());
+
+ // set size to cleanup another one
+ localizer.setTargetCacheSize(34);
+
+ // should remove the third blob
+ localizer.cleanup();
+
+ lrsrcSet = localizer.getUserResources().get(user1);
+ assertEquals("local resource set size wrong", 1, lrsrcSet.getSize());
+ assertTrue("blob deleted", keyFile.exists());
+ assertFalse("blob not deleted", keyFile3.exists());
+ }
+
+ @Test(expected = AuthorizationException.class)
+ public void testFailAcls() throws Exception {
+ Map<String, Object> conf = new HashMap();
+ // set clean time really high so doesn't kick in
+ conf.put(DaemonConfig.SUPERVISOR_LOCALIZER_CACHE_CLEANUP_INTERVAL_MS, 60 * 60 * 1000);
+ // enable blobstore acl validation
+ conf.put(Config.STORM_BLOBSTORE_ACL_VALIDATION_ENABLED, true);
+
+ String topo1 = "topo1";
+ String key1 = "key1";
+ AsyncLocalizer localizer = new TestLocalizer(conf, baseDir.toString());
+
+ ReadableBlobMeta rbm = new ReadableBlobMeta();
+ // set acl so user doesn't have read access
+ AccessControl acl = new AccessControl(AccessControlType.USER, BlobStoreAclHandler.ADMIN);
+ acl.set_name(user1);
+ rbm.set_settable(new SettableBlobMeta(Arrays.asList(acl)));
+ when(mockblobstore.getBlobMeta(anyString())).thenReturn(rbm);
+ when(mockblobstore.getBlob(key1)).thenReturn(new TestInputStreamWithMeta());
+ File user1Dir = localizer.getLocalUserFileCacheDir(user1);
+ assertTrue("failed to create user dir", user1Dir.mkdirs());
+
+ // This should throw AuthorizationException because auth failed
+ localizer.getBlob(new LocalResource(key1, false), user1, topo1, user1Dir);
+ }
+
+ @Test(expected = KeyNotFoundException.class)
+ public void testKeyNotFoundException() throws Exception {
+ Map<String, Object> conf = Utils.readStormConfig();
+ String key1 = "key1";
+ conf.put(Config.STORM_LOCAL_DIR, "target");
+ LocalFsBlobStore bs = new LocalFsBlobStore();
+ LocalFsBlobStore spy = spy(bs);
+ Mockito.doReturn(true).when(spy).checkForBlobOrDownload(key1);
+ Mockito.doNothing().when(spy).checkForBlobUpdate(key1);
+ spy.prepare(conf,null,null);
+ spy.getBlob(key1, null);
+ }
+
+ @Test
+ public void testMultipleUsers() throws Exception {
+ Map<String, Object> conf = new HashMap();
+ // set clean time really high so doesn't kick in
+ conf.put(DaemonConfig.SUPERVISOR_LOCALIZER_CACHE_CLEANUP_INTERVAL_MS, 60*60*1000);
+
+ String topo1 = "topo1";
+ String topo2 = "topo2";
+ String topo3 = "topo3";
+ String key1 = "key1";
+ String key2 = "key2";
+ String key3 = "key3";
+ AsyncLocalizer localizer = new TestLocalizer(conf, baseDir.toString());
+ // set to keep 2 blobs (each of size 34)
+ localizer.setTargetCacheSize(68);
+
+ ReadableBlobMeta rbm = new ReadableBlobMeta();
+ rbm.set_settable(new SettableBlobMeta(BlobStoreAclHandler.WORLD_EVERYTHING));
+ when(mockblobstore.getBlobMeta(anyString())).thenReturn(rbm);
+ when(mockblobstore.getBlob(key1)).thenReturn(new TestInputStreamWithMeta());
+ when(mockblobstore.getBlob(key2)).thenReturn(new TestInputStreamWithMeta());
+ when(mockblobstore.getBlob(key3)).thenReturn(new TestInputStreamWithMeta());
+
+ File user1Dir = localizer.getLocalUserFileCacheDir(user1);
+ assertTrue("failed to create user dir", user1Dir.mkdirs());
+ File user2Dir = localizer.getLocalUserFileCacheDir(user2);
+ assertTrue("failed to create user dir", user2Dir.mkdirs());
+ File user3Dir = localizer.getLocalUserFileCacheDir(user3);
+ assertTrue("failed to create user dir", user3Dir.mkdirs());
+
+ LocalizedResource lrsrc = localizer.getBlob(new LocalResource(key1, false), user1, topo1,
+ user1Dir);
+ LocalizedResource lrsrc2 = localizer.getBlob(new LocalResource(key2, false), user2, topo2,
+ user2Dir);
+ LocalizedResource lrsrc3 = localizer.getBlob(new LocalResource(key3, false), user3, topo3,
+ user3Dir);
+ // make sure we support different user reading same blob
+ LocalizedResource lrsrc1_user3 = localizer.getBlob(new LocalResource(key1, false), user3,
+ topo3, user3Dir);
+
+ String expectedUserDir1 = joinPath(baseDir.toString(), USERCACHE, user1);
+ String expectedFileDirUser1 = joinPath(expectedUserDir1, AsyncLocalizer.FILECACHE, AsyncLocalizer.FILESDIR);
+ String expectedFileDirUser2 = joinPath(baseDir.toString(), USERCACHE, user2,
+ AsyncLocalizer.FILECACHE, AsyncLocalizer.FILESDIR);
+ String expectedFileDirUser3 = joinPath(baseDir.toString(), USERCACHE, user3,
+ AsyncLocalizer.FILECACHE, AsyncLocalizer.FILESDIR);
+ assertTrue("user filecache dir user1 not created", new File(expectedFileDirUser1).exists());
+ assertTrue("user filecache dir user2 not created", new File(expectedFileDirUser2).exists());
+ assertTrue("user filecache dir user3 not created", new File(expectedFileDirUser3).exists());
+
+ File keyFile = new File(expectedFileDirUser1, key1 + ServerUtils.DEFAULT_CURRENT_BLOB_SUFFIX);
+ File keyFile2 = new File(expectedFileDirUser2, key2 + ServerUtils.DEFAULT_CURRENT_BLOB_SUFFIX);
+ File keyFile3 = new File(expectedFileDirUser3, key3 + ServerUtils.DEFAULT_CURRENT_BLOB_SUFFIX);
+ File keyFile1user3 = new File(expectedFileDirUser3, key1 + ServerUtils.DEFAULT_CURRENT_BLOB_SUFFIX);
+
+ assertTrue("blob not created", keyFile.exists());
+ assertTrue("blob not created", keyFile2.exists());
+ assertTrue("blob not created", keyFile3.exists());
+ assertTrue("blob not created", keyFile1user3.exists());
+
+ LocalizedResourceSet lrsrcSet = localizer.getUserResources().get(user1);
+ assertEquals("local resource set size wrong", 1, lrsrcSet.getSize());
+ LocalizedResourceSet lrsrcSet2 = localizer.getUserResources().get(user2);
+ assertEquals("local resource set size wrong", 1, lrsrcSet2.getSize());
+ LocalizedResourceSet lrsrcSet3 = localizer.getUserResources().get(user3);
+ assertEquals("local resource set size wrong", 2, lrsrcSet3.getSize());
+
+ localizer.removeBlobReference(lrsrc.getKey(), user1, topo1, false);
+ // should remove key1
+ localizer.cleanup();
+
+ lrsrcSet = localizer.getUserResources().get(user1);
+ lrsrcSet3 = localizer.getUserResources().get(user3);
+ assertNull("user set should be null", lrsrcSet);
+ assertFalse("blob dir not deleted", new File(expectedFileDirUser1).exists());
+ assertFalse("blob dir not deleted", new File(expectedUserDir1).exists());
+ assertEquals("local resource set size wrong", 2, lrsrcSet3.getSize());
+
+ assertTrue("blob deleted", keyFile2.exists());
+ assertFalse("blob not deleted", keyFile.exists());
+ assertTrue("blob deleted", keyFile3.exists());
+ assertTrue("blob deleted", keyFile1user3.exists());
+ }
+
+ @Test
+ public void testUpdate() throws Exception {
+ Map<String, Object> conf = new HashMap();
+ // set clean time really high so doesn't kick in
+ conf.put(DaemonConfig.SUPERVISOR_LOCALIZER_CACHE_CLEANUP_INTERVAL_MS, 60*60*1000);
+
+ String key1 = "key1";
+ String topo1 = "topo1";
+ String topo2 = "topo2";
+ AsyncLocalizer localizer = new TestLocalizer(conf, baseDir.toString());
+
+ ReadableBlobMeta rbm = new ReadableBlobMeta();
+ rbm.set_version(1);
+ rbm.set_settable(new SettableBlobMeta(BlobStoreAclHandler.WORLD_EVERYTHING));
+ when(mockblobstore.getBlobMeta(key1)).thenReturn(rbm);
+ when(mockblobstore.getBlob(key1)).thenReturn(new TestInputStreamWithMeta());
+
+ File user1Dir = localizer.getLocalUserFileCacheDir(user1);
+ assertTrue("failed to create user dir", user1Dir.mkdirs());
+ LocalizedResource lrsrc = localizer.getBlob(new LocalResource(key1, false), user1, topo1,
+ user1Dir);
+
+ String expectedUserDir = joinPath(baseDir.toString(), USERCACHE, user1);
+ String expectedFileDir = joinPath(expectedUserDir, AsyncLocalizer.FILECACHE, AsyncLocalizer.FILESDIR);
+ assertTrue("user filecache dir not created", new File(expectedFileDir).exists());
+ File keyFile = new File(expectedFileDir, key1);
+ File keyFileCurrentSymlink = new File(expectedFileDir, key1 + ServerUtils.DEFAULT_CURRENT_BLOB_SUFFIX);
+ assertTrue("blob not created", keyFileCurrentSymlink.exists());
+ File versionFile = new File(expectedFileDir, key1 + ServerUtils.DEFAULT_BLOB_VERSION_SUFFIX);
+ assertTrue("blob version file not created", versionFile.exists());
+ assertEquals("blob version not correct", 1, ServerUtils.localVersionOfBlob(keyFile.toString()));
+
+ LocalizedResourceSet lrsrcSet = localizer.getUserResources().get(user1);
+ assertEquals("local resource set size wrong", 1, lrsrcSet.getSize());
+
+ // test another topology getting blob with updated version - it should update version now
+ rbm.set_version(2);
+
+ localizer.getBlob(new LocalResource(key1, false), user1, topo2, user1Dir);
+ assertTrue("blob version file not created", versionFile.exists());
+ assertEquals("blob version not correct", 2, ServerUtils.localVersionOfBlob(keyFile.toString()));
+ assertTrue("blob file with version 2 not created", new File(keyFile + ".2").exists());
+
+ // now test regular updateBlob
+ rbm.set_version(3);
+
+ ArrayList<LocalResource> arr = new ArrayList<LocalResource>();
+ arr.add(new LocalResource(key1, false));
+ localizer.updateBlobs(arr, user1);
+ assertTrue("blob version file not created", versionFile.exists());
+ assertEquals("blob version not correct", 3, ServerUtils.localVersionOfBlob(keyFile.toString()));
+ assertTrue("blob file with version 3 not created", new File(keyFile + ".3").exists());
+ }
}
[7/9] storm git commit: Merge branch 'patch-1' of
https://github.com/Ethanlm/storm into PR-2346
Posted by ka...@apache.org.
Merge branch 'patch-1' of https://github.com/Ethanlm/storm into PR-2346
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/ddea2ac7
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/ddea2ac7
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/ddea2ac7
Branch: refs/heads/master
Commit: ddea2ac7e31955a6bff0e77bb08550f870af3ce0
Parents: b5be1d6 b1f98c5
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Thu Sep 28 16:17:48 2017 +0900
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Thu Sep 28 16:17:48 2017 +0900
----------------------------------------------------------------------
.../java/org/apache/storm/daemon/logviewer/LogviewerServer.java | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------