You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ka...@apache.org on 2017/09/28 07:21:36 UTC
[2/9] storm git commit: STORM-2084: Refactor localization to combine
files together
http://git-wip-us.apache.org/repos/asf/storm/blob/78cb243c/storm-server/src/main/java/org/apache/storm/localizer/Localizer.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/localizer/Localizer.java b/storm-server/src/main/java/org/apache/storm/localizer/Localizer.java
deleted file mode 100644
index 353ab56..0000000
--- a/storm-server/src/main/java/org/apache/storm/localizer/Localizer.java
+++ /dev/null
@@ -1,695 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.localizer;
-
-import org.apache.storm.Config;
-import org.apache.storm.DaemonConfig;
-import org.apache.storm.blobstore.ClientBlobStore;
-import org.apache.storm.blobstore.InputStreamWithMeta;
-import org.apache.storm.generated.AuthorizationException;
-import org.apache.storm.generated.KeyNotFoundException;
-import org.apache.storm.utils.ConfigUtils;
-import org.apache.storm.utils.ServerUtils;
-import org.apache.storm.utils.ObjectReader;
-import org.apache.storm.utils.ShellUtils.ExitCodeException;
-import org.apache.storm.utils.ShellUtils.ShellCommandExecutor;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-import org.apache.commons.io.FileUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.BufferedWriter;
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.FileWriter;
-import java.io.FilenameFilter;
-import java.io.IOException;
-import java.io.PrintWriter;
-import java.nio.file.Files;
-import java.nio.file.Paths;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Map;
-import java.util.Collection;
-import java.util.ArrayList;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.RejectedExecutionException;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-
-import static java.nio.file.StandardCopyOption.ATOMIC_MOVE;
-
-/**
- * Class to download and manage files from the blobstore. It uses an LRU cache
- * to determine which files to keep so they can be reused and which files to delete.
- */
-public class Localizer {
- public static final Logger LOG = LoggerFactory.getLogger(Localizer.class);
- public static final String FILECACHE = "filecache";
- public static final String USERCACHE = "usercache";
- // sub directories to store either files or uncompressed archives respectively
- public static final String FILESDIR = "files";
- public static final String ARCHIVESDIR = "archives";
-
- private static final String TO_UNCOMPRESS = "_tmp_";
-
-
-
- private final Map<String, Object> _conf;
- private final int _threadPoolSize;
- // thread pool for initial download
- private final ExecutorService _execService;
- // thread pool for updates
- private final ExecutorService _updateExecService;
- private final int _blobDownloadRetries;
-
- // track resources - user to resourceSet
- private final ConcurrentMap<String, LocalizedResourceSet> _userRsrc = new
- ConcurrentHashMap<String, LocalizedResourceSet>();
-
- private final String _localBaseDir;
-
- // cleanup
- private long _cacheTargetSize;
- private long _cacheCleanupPeriod;
- private ScheduledExecutorService _cacheCleanupService;
-
- public Localizer(Map<String, Object> conf, String baseDir) {
- _conf = conf;
- _localBaseDir = baseDir;
- // default cache size 10GB, converted to Bytes
- _cacheTargetSize = ObjectReader.getInt(_conf.get(DaemonConfig.SUPERVISOR_LOCALIZER_CACHE_TARGET_SIZE_MB),
- 10 * 1024).longValue() << 20;
- // default 10 minutes.
- _cacheCleanupPeriod = ObjectReader.getInt(_conf.get(
- DaemonConfig.SUPERVISOR_LOCALIZER_CACHE_CLEANUP_INTERVAL_MS), 10 * 60 * 1000).longValue();
-
- // if we needed we could make config for update thread pool size
- _threadPoolSize = ObjectReader.getInt(_conf.get(DaemonConfig.SUPERVISOR_BLOBSTORE_DOWNLOAD_THREAD_COUNT), 5);
- _blobDownloadRetries = ObjectReader.getInt(_conf.get(
- DaemonConfig.SUPERVISOR_BLOBSTORE_DOWNLOAD_MAX_RETRIES), 3);
-
- _execService = Executors.newFixedThreadPool(_threadPoolSize);
- _updateExecService = Executors.newFixedThreadPool(_threadPoolSize);
- reconstructLocalizedResources();
- }
-
- // For testing, it allows setting size in bytes
- protected void setTargetCacheSize(long size) {
- _cacheTargetSize = size;
- }
-
- // For testing, be careful as it doesn't clone
- ConcurrentMap<String, LocalizedResourceSet> getUserResources() {
- return _userRsrc;
- }
-
- public void startCleaner() {
- _cacheCleanupService = new ScheduledThreadPoolExecutor(1,
- new ThreadFactoryBuilder()
- .setNameFormat("Localizer Cache Cleanup")
- .build());
-
- _cacheCleanupService.scheduleWithFixedDelay(new Runnable() {
- @Override
- public void run() {
- handleCacheCleanup();
- }
- }, _cacheCleanupPeriod, _cacheCleanupPeriod, TimeUnit.MILLISECONDS);
- }
-
- public void shutdown() {
- if (_cacheCleanupService != null) {
- _cacheCleanupService.shutdown();
- }
- if (_execService != null) {
- _execService.shutdown();
- }
- if (_updateExecService != null) {
- _updateExecService.shutdown();
- }
- }
-
- // baseDir/supervisor/usercache/
- protected File getUserCacheDir() {
- return new File(_localBaseDir, USERCACHE);
- }
-
- // baseDir/supervisor/usercache/user1/
- protected File getLocalUserDir(String userName) {
- return new File(getUserCacheDir(), userName);
- }
-
- // baseDir/supervisor/usercache/user1/filecache
- public File getLocalUserFileCacheDir(String userName) {
- return new File(getLocalUserDir(userName), FILECACHE);
- }
-
- // baseDir/supervisor/usercache/user1/filecache/files
- protected File getCacheDirForFiles(File dir) {
- return new File(dir, FILESDIR);
- }
-
- // get the directory to put uncompressed archives in
- // baseDir/supervisor/usercache/user1/filecache/archives
- protected File getCacheDirForArchives(File dir) {
- return new File(dir, ARCHIVESDIR);
- }
-
- protected void addLocalizedResourceInDir(String dir, LocalizedResourceSet lrsrcSet,
- boolean uncompress) {
- File[] lrsrcs = readCurrentBlobs(dir);
-
- if (lrsrcs != null) {
- for (File rsrc : lrsrcs) {
- LOG.info("add localized in dir found: " + rsrc);
- /// strip off .suffix
- String path = rsrc.getPath();
- int p = path.lastIndexOf('.');
- if (p > 0) {
- path = path.substring(0, p);
- }
- LOG.debug("local file is: {} path is: {}", rsrc.getPath(), path);
- LocalizedResource lrsrc = new LocalizedResource(new File(path).getName(), path,
- uncompress);
- lrsrcSet.add(lrsrc.getKey(), lrsrc, uncompress);
- }
- }
- }
-
- // Looks for files in the directory with .current suffix
- protected File[] readCurrentBlobs(String location) {
- File dir = new File(location);
- File[] files = null;
- if (dir.exists()) {
- files = dir.listFiles(new FilenameFilter() {
- @Override
- public boolean accept(File dir, String name) {
- return name.toLowerCase().endsWith(ServerUtils.DEFAULT_CURRENT_BLOB_SUFFIX);
- }
- });
- }
- return files;
- }
-
- // Check to see if there are any existing files already localized.
- protected void reconstructLocalizedResources() {
- try {
- LOG.info("Reconstruct localized resource: " + getUserCacheDir().getPath());
- Collection<File> users = ConfigUtils.readDirFiles(getUserCacheDir().getPath());
- if (!(users == null || users.isEmpty())) {
- for (File userDir : users) {
- String user = userDir.getName();
- LOG.debug("looking in: {} for user: {}", userDir.getPath(), user);
- LocalizedResourceSet newSet = new LocalizedResourceSet(user);
- LocalizedResourceSet lrsrcSet = _userRsrc.putIfAbsent(user, newSet);
- if (lrsrcSet == null) {
- lrsrcSet = newSet;
- }
- addLocalizedResourceInDir(getCacheDirForFiles(getLocalUserFileCacheDir(user)).getPath(),
- lrsrcSet, false);
- addLocalizedResourceInDir(
- getCacheDirForArchives(getLocalUserFileCacheDir(user)).getPath(),
- lrsrcSet, true);
- }
- } else {
- LOG.warn("No left over resources found for any user during reconstructing of local resources at: {}", getUserCacheDir().getPath());
- }
- } catch (Exception e) {
- LOG.error("ERROR reconstructing localized resources", e);
- }
- }
-
- // ignores invalid user/topo/key
- public synchronized void removeBlobReference(String key, String user, String topo,
- boolean uncompress) throws AuthorizationException, KeyNotFoundException {
- LocalizedResourceSet lrsrcSet = _userRsrc.get(user);
- if (lrsrcSet != null) {
- LocalizedResource lrsrc = lrsrcSet.get(key, uncompress);
- if (lrsrc != null) {
- LOG.debug("removing blob reference to: {} for topo: {}", key, topo);
- lrsrc.removeReference(topo);
- } else {
- LOG.warn("trying to remove non-existent blob, key: " + key + " for user: " + user +
- " topo: " + topo);
- }
- } else {
- LOG.warn("trying to remove blob for non-existent resource set for user: " + user + " key: "
- + key + " topo: " + topo);
- }
- }
-
- public synchronized void addReferences(List<LocalResource> localresource, String user,
- String topo) {
- LocalizedResourceSet lrsrcSet = _userRsrc.get(user);
- if (lrsrcSet != null) {
- for (LocalResource blob : localresource) {
- LocalizedResource lrsrc = lrsrcSet.get(blob.getBlobName(), blob.shouldUncompress());
- if (lrsrc != null) {
- lrsrc.addReference(topo);
- LOG.debug("added reference for topo: {} key: {}", topo, blob);
- } else {
- LOG.warn("trying to add reference to non-existent blob, key: " + blob + " topo: " + topo);
- }
- }
- } else {
- LOG.warn("trying to add reference to non-existent local resource set, " +
- "user: " + user + " topo: " + topo);
- }
- }
-
- /**
- * This function either returns the blob in the existing cache or if it doesn't exist in the
- * cache, it will download the blob and will block until the download is complete.
- */
- public LocalizedResource getBlob(LocalResource localResource, String user, String topo,
- File userFileDir) throws AuthorizationException, KeyNotFoundException, IOException {
- ArrayList<LocalResource> arr = new ArrayList<LocalResource>();
- arr.add(localResource);
- List<LocalizedResource> results = getBlobs(arr, user, topo, userFileDir);
- if (results.isEmpty() || results.size() != 1) {
- throw new IOException("Unknown error getting blob: " + localResource + ", for user: " + user +
- ", topo: " + topo);
- }
- return results.get(0);
- }
-
- protected boolean isLocalizedResourceDownloaded(LocalizedResource lrsrc) {
- File rsrcFileCurrent = new File(lrsrc.getCurrentSymlinkPath());
- File rsrcFileWithVersion = new File(lrsrc.getFilePathWithVersion());
- File versionFile = new File(lrsrc.getVersionFilePath());
- return (rsrcFileWithVersion.exists() && rsrcFileCurrent.exists() && versionFile.exists());
- }
-
- protected boolean isLocalizedResourceUpToDate(LocalizedResource lrsrc,
- ClientBlobStore blobstore) throws AuthorizationException, KeyNotFoundException {
- String localFile = lrsrc.getFilePath();
- long nimbusBlobVersion = ServerUtils.nimbusVersionOfBlob(lrsrc.getKey(), blobstore);
- long currentBlobVersion = ServerUtils.localVersionOfBlob(localFile);
- return (nimbusBlobVersion == currentBlobVersion);
- }
-
- protected ClientBlobStore getClientBlobStore() {
- return ServerUtils.getClientBlobStoreForSupervisor(_conf);
- }
-
- /**
- * This function updates blobs on the supervisor. It uses a separate thread pool and runs
- * asynchronously of the download and delete.
- */
- public List<LocalizedResource> updateBlobs(List<LocalResource> localResources,
- String user) throws AuthorizationException, KeyNotFoundException, IOException {
- LocalizedResourceSet lrsrcSet = _userRsrc.get(user);
- ArrayList<LocalizedResource> results = new ArrayList<>();
- ArrayList<Callable<LocalizedResource>> updates = new ArrayList<>();
-
- if (lrsrcSet == null) {
- // resource set must have been removed
- return results;
- }
- ClientBlobStore blobstore = null;
- try {
- blobstore = getClientBlobStore();
- for (LocalResource localResource: localResources) {
- String key = localResource.getBlobName();
- LocalizedResource lrsrc = lrsrcSet.get(key, localResource.shouldUncompress());
- if (lrsrc == null) {
- LOG.warn("blob requested for update doesn't exist: {}", key);
- continue;
- } else if ((boolean)_conf.getOrDefault(Config.DISABLE_SYMLINKS, false)) {
- LOG.warn("symlinks are disabled so blobs cannot be downloaded.");
- continue;
- } else {
- // update it if either the version isn't the latest or if any local blob files are missing
- if (!isLocalizedResourceUpToDate(lrsrc, blobstore) ||
- !isLocalizedResourceDownloaded(lrsrc)) {
- LOG.debug("updating blob: {}", key);
- updates.add(new DownloadBlob(this, _conf, key, new File(lrsrc.getFilePath()), user,
- lrsrc.isUncompressed(), true));
- }
- }
- }
- } finally {
- if(blobstore != null) {
- blobstore.shutdown();
- }
- }
- try {
- List<Future<LocalizedResource>> futures = _updateExecService.invokeAll(updates);
- for (Future<LocalizedResource> futureRsrc : futures) {
- try {
- LocalizedResource lrsrc = futureRsrc.get();
- // put the resource just in case it was removed at same time by the cleaner
- LocalizedResourceSet newSet = new LocalizedResourceSet(user);
- LocalizedResourceSet newlrsrcSet = _userRsrc.putIfAbsent(user, newSet);
- if (newlrsrcSet == null) {
- newlrsrcSet = newSet;
- }
- newlrsrcSet.putIfAbsent(lrsrc.getKey(), lrsrc, lrsrc.isUncompressed());
- results.add(lrsrc);
- }
- catch (ExecutionException e) {
- LOG.error("Error updating blob: ", e);
- if (e.getCause() instanceof AuthorizationException) {
- throw (AuthorizationException)e.getCause();
- }
- if (e.getCause() instanceof KeyNotFoundException) {
- throw (KeyNotFoundException)e.getCause();
- }
- }
- }
- } catch (RejectedExecutionException re) {
- LOG.error("Error updating blobs : ", re);
- } catch (InterruptedException ie) {
- throw new IOException("Interrupted Exception", ie);
- }
- return results;
- }
-
- /**
- * This function either returns the blobs in the existing cache or if they don't exist in the
- * cache, it downloads them in parallel (up to SUPERVISOR_BLOBSTORE_DOWNLOAD_THREAD_COUNT)
- * and will block until all of them have been downloaded
- */
- public synchronized List<LocalizedResource> getBlobs(List<LocalResource> localResources,
- String user, String topo, File userFileDir)
- throws AuthorizationException, KeyNotFoundException, IOException {
- if ((boolean)_conf.getOrDefault(Config.DISABLE_SYMLINKS, false)) {
- throw new KeyNotFoundException("symlinks are disabled so blobs cannot be downloaded.");
- }
- LocalizedResourceSet newSet = new LocalizedResourceSet(user);
- LocalizedResourceSet lrsrcSet = _userRsrc.putIfAbsent(user, newSet);
- if (lrsrcSet == null) {
- lrsrcSet = newSet;
- }
- ArrayList<LocalizedResource> results = new ArrayList<>();
- ArrayList<Callable<LocalizedResource>> downloads = new ArrayList<>();
-
- ClientBlobStore blobstore = null;
- try {
- blobstore = getClientBlobStore();
- for (LocalResource localResource: localResources) {
- String key = localResource.getBlobName();
- boolean uncompress = localResource.shouldUncompress();
- LocalizedResource lrsrc = lrsrcSet.get(key, localResource.shouldUncompress());
- boolean isUpdate = false;
- if ((lrsrc != null) && (lrsrc.isUncompressed() == localResource.shouldUncompress()) &&
- (isLocalizedResourceDownloaded(lrsrc))) {
- if (isLocalizedResourceUpToDate(lrsrc, blobstore)) {
- LOG.debug("blob already exists: {}", key);
- lrsrc.addReference(topo);
- results.add(lrsrc);
- continue;
- }
- LOG.debug("blob exists but isn't up to date: {}", key);
- isUpdate = true;
- }
-
- // go off to blobstore and get it
- // assume dir passed in exists and has correct permission
- LOG.debug("fetching blob: {}", key);
- File downloadDir = getCacheDirForFiles(userFileDir);
- File localFile = new File(downloadDir, key);
- if (uncompress) {
- // for compressed file, download to archives dir
- downloadDir = getCacheDirForArchives(userFileDir);
- localFile = new File(downloadDir, key);
- }
- downloadDir.mkdir();
- downloads.add(new DownloadBlob(this, _conf, key, localFile, user, uncompress,
- isUpdate));
- }
- } finally {
- if(blobstore !=null) {
- blobstore.shutdown();
- }
- }
- try {
- List<Future<LocalizedResource>> futures = _execService.invokeAll(downloads);
- for (Future<LocalizedResource> futureRsrc: futures) {
- LocalizedResource lrsrc = futureRsrc.get();
- lrsrc.addReference(topo);
- lrsrcSet.add(lrsrc.getKey(), lrsrc, lrsrc.isUncompressed());
- results.add(lrsrc);
- }
- } catch (ExecutionException e) {
- if (e.getCause() instanceof AuthorizationException)
- throw (AuthorizationException)e.getCause();
- else if (e.getCause() instanceof KeyNotFoundException) {
- throw (KeyNotFoundException)e.getCause();
- } else {
- throw new IOException("Error getting blobs", e);
- }
- } catch (RejectedExecutionException re) {
- throw new IOException("RejectedExecutionException: ", re);
- } catch (InterruptedException ie) {
- throw new IOException("Interrupted Exception", ie);
- }
- return results;
- }
-
- static class DownloadBlob implements Callable<LocalizedResource> {
-
- private Localizer _localizer;
- private Map _conf;
- private String _key;
- private File _localFile;
- private String _user;
- private boolean _uncompress;
- private boolean _isUpdate;
-
- public DownloadBlob(Localizer localizer, Map<String, Object> conf, String key, File localFile,
- String user, boolean uncompress, boolean update) {
- _localizer = localizer;
- _conf = conf;
- _key = key;
- _localFile = localFile;
- _user = user;
- _uncompress = uncompress;
- _isUpdate = update;
- }
-
- @Override
- public LocalizedResource call()
- throws AuthorizationException, KeyNotFoundException, IOException {
- return _localizer.downloadBlob(_conf, _key, _localFile, _user, _uncompress,
- _isUpdate);
- }
- }
-
- private LocalizedResource downloadBlob(Map<String, Object> conf, String key, File localFile,
- String user, boolean uncompress, boolean isUpdate)
- throws AuthorizationException, KeyNotFoundException, IOException {
- ClientBlobStore blobstore = null;
- try {
- blobstore = getClientBlobStore();
- long nimbusBlobVersion = ServerUtils.nimbusVersionOfBlob(key, blobstore);
- long oldVersion = ServerUtils.localVersionOfBlob(localFile.toString());
- FileOutputStream out = null;
- PrintWriter writer = null;
- int numTries = 0;
- String localizedPath = localFile.toString();
- String localFileWithVersion = ServerUtils.constructBlobWithVersionFileName(localFile.toString(),
- nimbusBlobVersion);
- String localVersionFile = ServerUtils.constructVersionFileName(localFile.toString());
- String downloadFile = localFileWithVersion;
- if (uncompress) {
- // we need to download to temp file and then unpack into the one requested
- downloadFile = new File(localFile.getParent(), TO_UNCOMPRESS + localFile.getName()).toString();
- }
- while (numTries < _blobDownloadRetries) {
- out = new FileOutputStream(downloadFile);
- numTries++;
- try {
- if (!ServerUtils.canUserReadBlob(blobstore.getBlobMeta(key), user, conf)) {
- throw new AuthorizationException(user + " does not have READ access to " + key);
- }
- InputStreamWithMeta in = blobstore.getBlob(key);
- byte[] buffer = new byte[1024];
- int len;
- while ((len = in.read(buffer)) >= 0) {
- out.write(buffer, 0, len);
- }
- out.close();
- in.close();
- if (uncompress) {
- ServerUtils.unpack(new File(downloadFile), new File(localFileWithVersion));
- LOG.debug("uncompressed " + downloadFile + " to: " + localFileWithVersion);
- }
-
- // Next write the version.
- LOG.info("Blob: " + key + " updated with new Nimbus-provided version: " +
- nimbusBlobVersion + " local version was: " + oldVersion);
- // The false parameter ensures overwriting the version file, not appending
- writer = new PrintWriter(
- new BufferedWriter(new FileWriter(localVersionFile, false)));
- writer.println(nimbusBlobVersion);
- writer.close();
-
- try {
- setBlobPermissions(conf, user, localFileWithVersion);
- setBlobPermissions(conf, user, localVersionFile);
-
- // Update the key.current symlink. First create tmp symlink and do
- // move of tmp to current so that the operation is atomic.
- String tmp_uuid_local = java.util.UUID.randomUUID().toString();
- LOG.debug("Creating a symlink @" + localFile + "." + tmp_uuid_local + " , " +
- "linking to: " + localFile + "." + nimbusBlobVersion);
- File uuid_symlink = new File(localFile + "." + tmp_uuid_local);
-
- Files.createSymbolicLink(uuid_symlink.toPath(),
- Paths.get(ServerUtils.constructBlobWithVersionFileName(localFile.toString(),
- nimbusBlobVersion)));
- File current_symlink = new File(ServerUtils.constructBlobCurrentSymlinkName(
- localFile.toString()));
- Files.move(uuid_symlink.toPath(), current_symlink.toPath(), ATOMIC_MOVE);
- } catch (IOException e) {
- // if we fail after writing the version file but before we move current link we need to
- // restore the old version to the file
- try {
- PrintWriter restoreWriter = new PrintWriter(
- new BufferedWriter(new FileWriter(localVersionFile, false)));
- restoreWriter.println(oldVersion);
- restoreWriter.close();
- } catch (IOException ignore) {}
- throw e;
- }
-
- String oldBlobFile = localFile + "." + oldVersion;
- try {
- // Remove the old version. Note that if a number of processes have that file open,
- // the OS will keep the old blob file around until they all close the handle and only
- // then deletes it. No new process will open the old blob, since the users will open the
- // blob through the "blob.current" symlink, which always points to the latest version of
- // a blob. Remove the old version after the current symlink is updated as to not affect
- // anyone trying to read it.
- if ((oldVersion != -1) && (oldVersion != nimbusBlobVersion)) {
- LOG.info("Removing an old blob file:" + oldBlobFile);
- Files.delete(Paths.get(oldBlobFile));
- }
- } catch (IOException e) {
- // At this point we have downloaded everything and moved symlinks. If the remove of
- // old fails just log an error
- LOG.error("Exception removing old blob version: " + oldBlobFile);
- }
-
- break;
- } catch (AuthorizationException ae) {
- // we consider this non-retriable exceptions
- if (out != null) {
- out.close();
- }
- new File(downloadFile).delete();
- throw ae;
- } catch (IOException | KeyNotFoundException e) {
- if (out != null) {
- out.close();
- }
- if (writer != null) {
- writer.close();
- }
- new File(downloadFile).delete();
- if (uncompress) {
- try {
- FileUtils.deleteDirectory(new File(localFileWithVersion));
- } catch (IOException ignore) {}
- }
- if (!isUpdate) {
- // don't want to remove existing version file if its an update
- new File(localVersionFile).delete();
- }
-
- if (numTries < _blobDownloadRetries) {
- LOG.error("Failed to download blob, retrying", e);
- } else {
- throw e;
- }
- }
- }
- return new LocalizedResource(key, localizedPath, uncompress);
- } finally {
- if(blobstore != null) {
- blobstore.shutdown();
- }
- }
- }
-
- public void setBlobPermissions(Map<String, Object> conf, String user, String path)
- throws IOException {
-
- if (!ObjectReader.getBoolean(conf.get(Config.SUPERVISOR_RUN_WORKER_AS_USER), false)) {
- return;
- }
- String wlCommand = ObjectReader.getString(conf.get(Config.SUPERVISOR_WORKER_LAUNCHER), "");
- if (wlCommand.isEmpty()) {
- String stormHome = System.getProperty("storm.home");
- wlCommand = stormHome + "/bin/worker-launcher";
- }
- List<String> command = new ArrayList<String>(Arrays.asList(wlCommand, user, "blob", path));
-
- String[] commandArray = command.toArray(new String[command.size()]);
- ShellCommandExecutor shExec = new ShellCommandExecutor(commandArray);
- LOG.info("Setting blob permissions, command: {}", Arrays.toString(commandArray));
-
- try {
- shExec.execute();
- LOG.debug("output: {}", shExec.getOutput());
- } catch (ExitCodeException e) {
- int exitCode = shExec.getExitCode();
- LOG.warn("Exit code from worker-launcher is : " + exitCode, e);
- LOG.debug("output: {}", shExec.getOutput());
- throw new IOException("Setting blob permissions failed" +
- " (exitCode=" + exitCode + ") with output: " + shExec.getOutput(), e);
- }
- }
-
-
- public synchronized void handleCacheCleanup() {
- LocalizedResourceRetentionSet toClean = new LocalizedResourceRetentionSet(_cacheTargetSize);
- // need one large set of all and then clean via LRU
- for (LocalizedResourceSet t : _userRsrc.values()) {
- toClean.addResources(t);
- LOG.debug("Resources to be cleaned after adding {} : {}", t.getUser(), toClean);
- }
- toClean.cleanup();
- LOG.debug("Resource cleanup: {}", toClean);
- for (LocalizedResourceSet t : _userRsrc.values()) {
- if (t.getSize() == 0) {
- String user = t.getUser();
-
- LOG.debug("removing empty set: {}", user);
- File userFileCacheDir = getLocalUserFileCacheDir(user);
- getCacheDirForFiles(userFileCacheDir).delete();
- getCacheDirForArchives(userFileCacheDir).delete();
- getLocalUserFileCacheDir(user).delete();
- boolean dirsRemoved = getLocalUserDir(user).delete();
- // to catch race with update thread
- if (dirsRemoved) {
- _userRsrc.remove(user);
- }
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/78cb243c/storm-server/src/main/java/org/apache/storm/utils/ServerUtils.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/utils/ServerUtils.java b/storm-server/src/main/java/org/apache/storm/utils/ServerUtils.java
index 311acda..7b127d2 100644
--- a/storm-server/src/main/java/org/apache/storm/utils/ServerUtils.java
+++ b/storm-server/src/main/java/org/apache/storm/utils/ServerUtils.java
@@ -33,23 +33,26 @@ import org.apache.storm.blobstore.ClientBlobStore;
import org.apache.storm.blobstore.InputStreamWithMeta;
import org.apache.storm.blobstore.LocalFsBlobStore;
import org.apache.storm.daemon.StormCommon;
-import org.apache.storm.generated.*;
-import org.apache.storm.localizer.Localizer;
+import org.apache.storm.generated.AccessControl;
+import org.apache.storm.generated.AccessControlType;
+import org.apache.storm.generated.AuthorizationException;
+import org.apache.storm.generated.InvalidTopologyException;
+import org.apache.storm.generated.KeyNotFoundException;
+import org.apache.storm.generated.ReadableBlobMeta;
+import org.apache.storm.generated.SettableBlobMeta;
+import org.apache.storm.generated.StormTopology;
import org.apache.storm.nimbus.NimbusInfo;
import org.apache.storm.scheduler.resource.ResourceUtils;
import org.apache.thrift.TException;
-import org.apache.storm.utils.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
-import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
-import java.io.FileReader;
import java.io.FileWriter;
import java.io.IOException;
import java.io.InputStream;
@@ -61,7 +64,6 @@ import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Arrays;
-import java.util.Collection;
import java.util.Collections;
import java.util.Enumeration;
import java.util.HashMap;
@@ -285,10 +287,6 @@ public class ServerUtils {
return Files.getOwner(FileSystems.getDefault().getPath(path)).getName();
}
- public static Localizer createLocalizer(Map<String, Object> conf, String baseDir) {
- return new Localizer(conf, baseDir);
- }
-
public static String containerFilePath (String dir) {
return dir + FILE_PATH_SEPARATOR + "launch_container.sh";
}
http://git-wip-us.apache.org/repos/asf/storm/blob/78cb243c/storm-server/src/test/java/org/apache/storm/daemon/supervisor/SlotTest.java
----------------------------------------------------------------------
diff --git a/storm-server/src/test/java/org/apache/storm/daemon/supervisor/SlotTest.java b/storm-server/src/test/java/org/apache/storm/daemon/supervisor/SlotTest.java
index eb25566..60b628e 100644
--- a/storm-server/src/test/java/org/apache/storm/daemon/supervisor/SlotTest.java
+++ b/storm-server/src/test/java/org/apache/storm/daemon/supervisor/SlotTest.java
@@ -25,7 +25,7 @@ import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
-import java.util.concurrent.Future;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import org.apache.storm.daemon.supervisor.Slot.StaticState;
@@ -39,7 +39,7 @@ import org.apache.storm.generated.NodeInfo;
import org.apache.storm.generated.ProfileAction;
import org.apache.storm.generated.ProfileRequest;
import org.apache.storm.generated.WorkerResources;
-import org.apache.storm.localizer.ILocalizer;
+import org.apache.storm.localizer.AsyncLocalizer;
import org.apache.storm.scheduler.ISupervisor;
import org.apache.storm.utils.LocalState;
import org.apache.storm.utils.Time;
@@ -115,7 +115,7 @@ public class SlotTest {
@Test
public void testEmptyToEmpty() throws Exception {
try (SimulatedTime t = new SimulatedTime(1010)){
- ILocalizer localizer = mock(ILocalizer.class);
+ AsyncLocalizer localizer = mock(AsyncLocalizer.class);
LocalState state = mock(LocalState.class);
ContainerLauncher containerLauncher = mock(ContainerLauncher.class);
ISupervisor iSuper = mock(ISupervisor.class);
@@ -137,7 +137,7 @@ public class SlotTest {
LocalAssignment newAssignment =
mkLocalAssignment(topoId, execList, mkWorkerResources(100.0, 100.0, 100.0));
- ILocalizer localizer = mock(ILocalizer.class);
+ AsyncLocalizer localizer = mock(AsyncLocalizer.class);
Container container = mock(Container.class);
LocalState state = mock(LocalState.class);
ContainerLauncher containerLauncher = mock(ContainerLauncher.class);
@@ -146,11 +146,11 @@ public class SlotTest {
when(container.readHeartbeat()).thenReturn(hb, hb);
@SuppressWarnings("unchecked")
- Future<Void> baseFuture = mock(Future.class);
+ CompletableFuture<Void> baseFuture = mock(CompletableFuture.class);
when(localizer.requestDownloadBaseTopologyBlobs(newAssignment, port)).thenReturn(baseFuture);
@SuppressWarnings("unchecked")
- Future<Void> blobFuture = mock(Future.class);
+ CompletableFuture<Void> blobFuture = mock(CompletableFuture.class);
when(localizer.requestDownloadTopologyBlobs(newAssignment, port)).thenReturn(blobFuture);
ISupervisor iSuper = mock(ISupervisor.class);
@@ -220,7 +220,7 @@ public class SlotTest {
LocalAssignment assignment =
mkLocalAssignment(topoId, execList, mkWorkerResources(100.0, 100.0, 100.0));
- ILocalizer localizer = mock(ILocalizer.class);
+ AsyncLocalizer localizer = mock(AsyncLocalizer.class);
Container container = mock(Container.class);
ContainerLauncher containerLauncher = mock(ContainerLauncher.class);
LSWorkerHeartbeat oldhb = mkWorkerHB(topoId, port, execList, Time.currentTimeSecs()-10);
@@ -276,7 +276,7 @@ public class SlotTest {
LocalAssignment nAssignment =
mkLocalAssignment(nTopoId, nExecList, mkWorkerResources(100.0, 100.0, 100.0));
- ILocalizer localizer = mock(ILocalizer.class);
+ AsyncLocalizer localizer = mock(AsyncLocalizer.class);
Container nContainer = mock(Container.class);
LocalState state = mock(LocalState.class);
ContainerLauncher containerLauncher = mock(ContainerLauncher.class);
@@ -285,11 +285,11 @@ public class SlotTest {
when(nContainer.readHeartbeat()).thenReturn(nhb, nhb);
@SuppressWarnings("unchecked")
- Future<Void> baseFuture = mock(Future.class);
+ CompletableFuture<Void> baseFuture = mock(CompletableFuture.class);
when(localizer.requestDownloadBaseTopologyBlobs(nAssignment, port)).thenReturn(baseFuture);
@SuppressWarnings("unchecked")
- Future<Void> blobFuture = mock(Future.class);
+ CompletableFuture<Void> blobFuture = mock(CompletableFuture.class);
when(localizer.requestDownloadTopologyBlobs(nAssignment, port)).thenReturn(blobFuture);
ISupervisor iSuper = mock(ISupervisor.class);
@@ -377,7 +377,7 @@ public class SlotTest {
when(cContainer.readHeartbeat()).thenReturn(chb);
when(cContainer.areAllProcessesDead()).thenReturn(false, true);
- ILocalizer localizer = mock(ILocalizer.class);
+ AsyncLocalizer localizer = mock(AsyncLocalizer.class);
ContainerLauncher containerLauncher = mock(ContainerLauncher.class);
ISupervisor iSuper = mock(ISupervisor.class);
@@ -437,7 +437,7 @@ public class SlotTest {
when(cContainer.readHeartbeat()).thenReturn(chb, chb, chb, chb, chb, chb);
when(cContainer.runProfiling(any(ProfileRequest.class), anyBoolean())).thenReturn(true);
- ILocalizer localizer = mock(ILocalizer.class);
+ AsyncLocalizer localizer = mock(AsyncLocalizer.class);
ContainerLauncher containerLauncher = mock(ContainerLauncher.class);
ISupervisor iSuper = mock(ISupervisor.class);
http://git-wip-us.apache.org/repos/asf/storm/blob/78cb243c/storm-server/src/test/java/org/apache/storm/localizer/AsyncLocalizerTest.java
----------------------------------------------------------------------
diff --git a/storm-server/src/test/java/org/apache/storm/localizer/AsyncLocalizerTest.java b/storm-server/src/test/java/org/apache/storm/localizer/AsyncLocalizerTest.java
index 2461b33..f49be63 100644
--- a/storm-server/src/test/java/org/apache/storm/localizer/AsyncLocalizerTest.java
+++ b/storm-server/src/test/java/org/apache/storm/localizer/AsyncLocalizerTest.java
@@ -15,24 +15,51 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.storm.localizer;
+import static org.apache.storm.localizer.AsyncLocalizer.USERCACHE;
+import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.*;
import static org.mockito.Mockito.*;
+import com.google.common.base.Joiner;
import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.file.Files;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.UUID;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.IOUtils;
import org.apache.storm.DaemonConfig;
+import org.apache.storm.blobstore.BlobStoreAclHandler;
+import org.apache.storm.blobstore.InputStreamWithMeta;
+import org.apache.storm.blobstore.LocalFsBlobStore;
import org.apache.storm.daemon.supervisor.AdvancedFSOps;
+import org.apache.storm.generated.AccessControl;
+import org.apache.storm.generated.AccessControlType;
+import org.apache.storm.generated.AuthorizationException;
+import org.apache.storm.generated.KeyNotFoundException;
+import org.apache.storm.generated.ReadableBlobMeta;
+import org.apache.storm.generated.SettableBlobMeta;
import org.apache.storm.utils.ConfigUtils;
import org.apache.storm.utils.ServerUtils;
import org.apache.storm.utils.ReflectionUtils;
+import org.apache.storm.utils.Utils;
+import org.junit.After;
+import org.junit.Before;
import org.junit.Test;
import org.apache.storm.Config;
@@ -41,9 +68,16 @@ import org.apache.storm.generated.ExecutorInfo;
import org.apache.storm.generated.LocalAssignment;
import org.apache.storm.generated.StormTopology;
import org.apache.storm.security.auth.DefaultPrincipalToLocal;
+import org.mockito.Mockito;
public class AsyncLocalizerTest {
+ private static String getTestLocalizerRoot() {
+ File f = new File("./target/" + Thread.currentThread().getStackTrace()[2].getMethodName() + "/localizer/");
+ f.deleteOnExit();
+ return f.getPath();
+ }
+
@Test
public void testRequestDownloadBaseTopologyBlobs() throws Exception {
final String topoId = "TOPO";
@@ -68,7 +102,6 @@ public class AsyncLocalizerTest {
conf.put(Config.STORM_PRINCIPAL_TO_LOCAL_PLUGIN, DefaultPrincipalToLocal.class.getName());
conf.put(Config.STORM_CLUSTER_MODE, "distributed");
conf.put(Config.STORM_LOCAL_DIR, stormLocal);
- Localizer localizer = mock(Localizer.class);
AdvancedFSOps ops = mock(AdvancedFSOps.class);
ConfigUtils mockedCU = mock(ConfigUtils.class);
ReflectionUtils mockedRU = mock(ReflectionUtils.class);
@@ -76,7 +109,7 @@ public class AsyncLocalizerTest {
Map<String, Object> topoConf = new HashMap<>(conf);
- AsyncLocalizer al = new AsyncLocalizer(conf, localizer, ops);
+ AsyncLocalizer bl = new AsyncLocalizer(conf, getTestLocalizerRoot(), ops, new AtomicReference<>(new HashMap<>()), null);
ConfigUtils orig = ConfigUtils.setInstance(mockedCU);
ReflectionUtils origRU = ReflectionUtils.setInstance(mockedRU);
ServerUtils origUtils = ServerUtils.setInstance(mockedU);
@@ -86,7 +119,7 @@ public class AsyncLocalizerTest {
when(mockedRU.newInstanceImpl(ClientBlobStore.class)).thenReturn(blobStore);
when(mockedCU.readSupervisorStormConfImpl(conf, topoId)).thenReturn(topoConf);
- Future<Void> f = al.requestDownloadBaseTopologyBlobs(la, port);
+ Future<Void> f = bl.requestDownloadBaseTopologyBlobs(la, port);
f.get(20, TimeUnit.SECONDS);
// We should be done now...
@@ -102,7 +135,7 @@ public class AsyncLocalizerTest {
verify(ops, never()).deleteIfExists(any(File.class));
} finally {
- al.shutdown();
+ bl.close();
ConfigUtils.setInstance(orig);
ReflectionUtils.setInstance(origRU);
ServerUtils.setInstance(origUtils);
@@ -129,7 +162,7 @@ public class AsyncLocalizerTest {
final File userDir = new File(stormLocal, user);
final String stormRoot = stormLocal+topoId+"/";
- final String localizerRoot = "/tmp/storm-localizer/";
+ final String localizerRoot = getTestLocalizerRoot();
final String simpleLocalFile = localizerRoot + user + "/simple";
final String simpleCurrentLocalFile = localizerRoot + user + "/simple.current";
@@ -146,7 +179,6 @@ public class AsyncLocalizerTest {
Map<String, Object> conf = new HashMap<>();
conf.put(Config.STORM_LOCAL_DIR, stormLocal);
- Localizer localizer = mock(Localizer.class);
AdvancedFSOps ops = mock(AdvancedFSOps.class);
ConfigUtils mockedCU = mock(ConfigUtils.class);
@@ -157,33 +189,662 @@ public class AsyncLocalizerTest {
List<LocalizedResource> localizedList = new ArrayList<>();
LocalizedResource simpleLocal = new LocalizedResource(simpleKey, simpleLocalFile, false);
localizedList.add(simpleLocal);
-
- AsyncLocalizer al = new AsyncLocalizer(conf, localizer, ops);
+
+ AsyncLocalizer bl = spy(new AsyncLocalizer(conf, localizerRoot, ops, new AtomicReference<>(new HashMap<>()), null));
ConfigUtils orig = ConfigUtils.setInstance(mockedCU);
try {
when(mockedCU.supervisorStormDistRootImpl(conf, topoId)).thenReturn(stormRoot);
when(mockedCU.readSupervisorStormConfImpl(conf, topoId)).thenReturn(topoConf);
when(mockedCU.readSupervisorTopologyImpl(conf, topoId, ops)).thenReturn(st);
-
- when(localizer.getLocalUserFileCacheDir(user)).thenReturn(userDir);
-
- when(localizer.getBlobs(any(List.class), eq(user), eq(topoName), eq(userDir))).thenReturn(localizedList);
-
- Future<Void> f = al.requestDownloadTopologyBlobs(la, port);
+
+ //Write the mocking backwards so the actual method is not called on the spy object
+ doReturn(userDir).when(bl).getLocalUserFileCacheDir(user);
+ doReturn(localizedList).when(bl).getBlobs(any(List.class), eq(user), eq(topoName), eq(userDir));
+
+ Future<Void> f = bl.requestDownloadTopologyBlobs(la, port);
f.get(20, TimeUnit.SECONDS);
// We should be done now...
-
- verify(localizer).getLocalUserFileCacheDir(user);
+
+ verify(bl).getLocalUserFileCacheDir(user);
+
verify(ops).fileExists(userDir);
verify(ops).forceMkdir(userDir);
-
- verify(localizer).getBlobs(any(List.class), eq(user), eq(topoName), eq(userDir));
+
+ verify(bl).getBlobs(any(List.class), eq(user), eq(topoName), eq(userDir));
verify(ops).createSymlink(new File(stormRoot, simpleLocalName), new File(simpleCurrentLocalFile));
} finally {
- al.shutdown();
+ bl.close();
ConfigUtils.setInstance(orig);
}
}
+
+ //From LocalizerTest
+ private File baseDir;
+
+ private final String user1 = "user1";
+ private final String user2 = "user2";
+ private final String user3 = "user3";
+
+ private ClientBlobStore mockblobstore = mock(ClientBlobStore.class);
+
+
+ class TestLocalizer extends AsyncLocalizer {
+
+ TestLocalizer(Map<String, Object> conf, String baseDir) throws IOException {
+ super(conf, baseDir, AdvancedFSOps.make(conf), new AtomicReference<>(new HashMap<>()), null);
+ }
+
+ @Override
+ protected ClientBlobStore getClientBlobStore() {
+ return mockblobstore;
+ }
+ }
+
+ class TestInputStreamWithMeta extends InputStreamWithMeta {
+ private InputStream iostream;
+
+ public TestInputStreamWithMeta() {
+ iostream = IOUtils.toInputStream("some test data for my input stream");
+ }
+
+ public TestInputStreamWithMeta(InputStream istream) {
+ iostream = istream;
+ }
+
+ @Override
+ public long getVersion() throws IOException {
+ return 1;
+ }
+
+ @Override
+ public synchronized int read() {
+ return 0;
+ }
+
+ @Override
+ public synchronized int read(byte[] b)
+ throws IOException {
+ int length = iostream.read(b);
+ if (length == 0) {
+ return -1;
+ }
+ return length;
+ }
+
+ @Override
+ public long getFileLength() {
+ return 0;
+ }
+ };
+
+ @Before
+ public void setUp() throws Exception {
+ baseDir = new File(System.getProperty("java.io.tmpdir") + "/blob-store-localizer-test-"+ UUID.randomUUID());
+ if (!baseDir.mkdir()) {
+ throw new IOException("failed to create base directory");
+ }
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ try {
+ FileUtils.deleteDirectory(baseDir);
+ } catch (IOException ignore) {}
+ }
+
+ protected String joinPath(String... pathList) {
+ return Joiner.on(File.separator).join(pathList);
+ }
+
+ public String constructUserCacheDir(String base, String user) {
+ return joinPath(base, USERCACHE, user);
+ }
+
+ public String constructExpectedFilesDir(String base, String user) {
+ return joinPath(constructUserCacheDir(base, user), AsyncLocalizer.FILECACHE, AsyncLocalizer.FILESDIR);
+ }
+
+ public String constructExpectedArchivesDir(String base, String user) {
+ return joinPath(constructUserCacheDir(base, user), AsyncLocalizer.FILECACHE, AsyncLocalizer.ARCHIVESDIR);
+ }
+
+ @Test
+ public void testDirPaths() throws Exception {
+ Map<String, Object> conf = new HashMap();
+ AsyncLocalizer localizer = new TestLocalizer(conf, baseDir.toString());
+
+ String expectedDir = constructUserCacheDir(baseDir.toString(), user1);
+ assertEquals("get local user dir doesn't return right value",
+ expectedDir, localizer.getLocalUserDir(user1).toString());
+
+ String expectedFileDir = joinPath(expectedDir, AsyncLocalizer.FILECACHE);
+ assertEquals("get local user file dir doesn't return right value",
+ expectedFileDir, localizer.getLocalUserFileCacheDir(user1).toString());
+ }
+
+ @Test
+ public void testReconstruct() throws Exception {
+ Map<String, Object> conf = new HashMap();
+
+ String expectedFileDir1 = constructExpectedFilesDir(baseDir.toString(), user1);
+ String expectedArchiveDir1 = constructExpectedArchivesDir(baseDir.toString(), user1);
+ String expectedFileDir2 = constructExpectedFilesDir(baseDir.toString(), user2);
+ String expectedArchiveDir2 = constructExpectedArchivesDir(baseDir.toString(), user2);
+
+ String key1 = "testfile1.txt";
+ String key2 = "testfile2.txt";
+ String key3 = "testfile3.txt";
+ String key4 = "testfile4.txt";
+
+ String archive1 = "archive1";
+ String archive2 = "archive2";
+
+ File user1file1 = new File(expectedFileDir1, key1 + ServerUtils.DEFAULT_CURRENT_BLOB_SUFFIX);
+ File user1file2 = new File(expectedFileDir1, key2 + ServerUtils.DEFAULT_CURRENT_BLOB_SUFFIX);
+ File user2file3 = new File(expectedFileDir2, key3 + ServerUtils.DEFAULT_CURRENT_BLOB_SUFFIX);
+ File user2file4 = new File(expectedFileDir2, key4 + ServerUtils.DEFAULT_CURRENT_BLOB_SUFFIX);
+
+ File user1archive1 = new File(expectedArchiveDir1, archive1 + ServerUtils.DEFAULT_CURRENT_BLOB_SUFFIX);
+ File user2archive2 = new File(expectedArchiveDir2, archive2 + ServerUtils.DEFAULT_CURRENT_BLOB_SUFFIX);
+ File user1archive1file = new File(user1archive1, "file1");
+ File user2archive2file = new File(user2archive2, "file2");
+
+ // setup some files/dirs to emulate supervisor restart
+ assertTrue("Failed setup filecache dir1", new File(expectedFileDir1).mkdirs());
+ assertTrue("Failed setup filecache dir2", new File(expectedFileDir2).mkdirs());
+ assertTrue("Failed setup file1", user1file1.createNewFile());
+ assertTrue("Failed setup file2", user1file2.createNewFile());
+ assertTrue("Failed setup file3", user2file3.createNewFile());
+ assertTrue("Failed setup file4", user2file4.createNewFile());
+ assertTrue("Failed setup archive dir1", user1archive1.mkdirs());
+ assertTrue("Failed setup archive dir2", user2archive2.mkdirs());
+ assertTrue("Failed setup file in archivedir1", user1archive1file.createNewFile());
+ assertTrue("Failed setup file in archivedir2", user2archive2file.createNewFile());
+
+ AsyncLocalizer localizer = new TestLocalizer(conf, baseDir.toString());
+
+ ArrayList<LocalResource> arrUser1Keys = new ArrayList<LocalResource>();
+ arrUser1Keys.add(new LocalResource(key1, false));
+ arrUser1Keys.add(new LocalResource(archive1, true));
+ localizer.addReferences(arrUser1Keys, user1, "topo1");
+
+ LocalizedResourceSet lrsrcSet = localizer.getUserResources().get(user1);
+ assertEquals("local resource set size wrong", 3, lrsrcSet.getSize());
+ assertEquals("user doesn't match", user1, lrsrcSet.getUser());
+ LocalizedResource key1rsrc = lrsrcSet.get(key1, false);
+ assertNotNull("Local resource doesn't exist but should", key1rsrc);
+ assertEquals("key doesn't match", key1, key1rsrc.getKey());
+ assertEquals("refcount doesn't match", 1, key1rsrc.getRefCount());
+ LocalizedResource key2rsrc = lrsrcSet.get(key2, false);
+ assertNotNull("Local resource doesn't exist but should", key2rsrc);
+ assertEquals("key doesn't match", key2, key2rsrc.getKey());
+ assertEquals("refcount doesn't match", 0, key2rsrc.getRefCount());
+ LocalizedResource archive1rsrc = lrsrcSet.get(archive1, true);
+ assertNotNull("Local resource doesn't exist but should", archive1rsrc);
+ assertEquals("key doesn't match", archive1, archive1rsrc.getKey());
+ assertEquals("refcount doesn't match", 1, archive1rsrc.getRefCount());
+
+ LocalizedResourceSet lrsrcSet2 = localizer.getUserResources().get(user2);
+ assertEquals("local resource set size wrong", 3, lrsrcSet2.getSize());
+ assertEquals("user doesn't match", user2, lrsrcSet2.getUser());
+ LocalizedResource key3rsrc = lrsrcSet2.get(key3, false);
+ assertNotNull("Local resource doesn't exist but should", key3rsrc);
+ assertEquals("key doesn't match", key3, key3rsrc.getKey());
+ assertEquals("refcount doesn't match", 0, key3rsrc.getRefCount());
+ LocalizedResource key4rsrc = lrsrcSet2.get(key4, false);
+ assertNotNull("Local resource doesn't exist but should", key4rsrc);
+ assertEquals("key doesn't match", key4, key4rsrc.getKey());
+ assertEquals("refcount doesn't match", 0, key4rsrc.getRefCount());
+ LocalizedResource archive2rsrc = lrsrcSet2.get(archive2, true);
+ assertNotNull("Local resource doesn't exist but should", archive2rsrc);
+ assertEquals("key doesn't match", archive2, archive2rsrc.getKey());
+ assertEquals("refcount doesn't match", 0, archive2rsrc.getRefCount());
+ }
+
+ @Test
+ public void testArchivesTgz() throws Exception {
+ testArchives(getFileFromResource(joinPath("localizer", "localtestwithsymlink.tgz")), true, 21344);
+ }
+
+ @Test
+ public void testArchivesZip() throws Exception {
+ testArchives(getFileFromResource(joinPath("localizer", "localtest.zip")), false, 21348);
+ }
+
+ @Test
+ public void testArchivesTarGz() throws Exception {
+ testArchives(getFileFromResource(joinPath("localizer", "localtestwithsymlink.tar.gz")), true, 21344);
+ }
+
+ @Test
+ public void testArchivesTar() throws Exception {
+ testArchives(getFileFromResource(joinPath("localizer", "localtestwithsymlink.tar")), true, 21344);
+ }
+
+ @Test
+ public void testArchivesJar() throws Exception {
+ testArchives(getFileFromResource(joinPath("localizer", "localtestwithsymlink.jar")), false, 21416);
+ }
+
+ private File getFileFromResource(String archivePath) {
+ ClassLoader classLoader = getClass().getClassLoader();
+ return new File(classLoader.getResource(archivePath).getFile());
+ }
+
+ // archive passed in must contain symlink named tmptestsymlink if not a zip file
+ public void testArchives(File archiveFile, boolean supportSymlinks, int size) throws Exception {
+ if (Utils.isOnWindows()) {
+ // Windows should set this to false cause symlink in compressed file doesn't work properly.
+ supportSymlinks = false;
+ }
+
+ Map<String, Object> conf = new HashMap();
+ // set clean time really high so doesn't kick in
+ conf.put(DaemonConfig.SUPERVISOR_LOCALIZER_CACHE_CLEANUP_INTERVAL_MS, 60*60*1000);
+
+ String key1 = archiveFile.getName();
+ String topo1 = "topo1";
+ AsyncLocalizer localizer = new TestLocalizer(conf, baseDir.toString());
+ // set really small so will do cleanup
+ localizer.setTargetCacheSize(1);
+
+ ReadableBlobMeta rbm = new ReadableBlobMeta();
+ rbm.set_settable(new SettableBlobMeta(BlobStoreAclHandler.WORLD_EVERYTHING));
+ when(mockblobstore.getBlobMeta(key1)).thenReturn(rbm);
+
+ when(mockblobstore.getBlob(key1)).thenReturn(new TestInputStreamWithMeta(new
+ FileInputStream(archiveFile.getAbsolutePath())));
+
+ long timeBefore = System.nanoTime();
+ File user1Dir = localizer.getLocalUserFileCacheDir(user1);
+ assertTrue("failed to create user dir", user1Dir.mkdirs());
+ LocalizedResource lrsrc = localizer.getBlob(new LocalResource(key1, true), user1, topo1,
+ user1Dir);
+ long timeAfter = System.nanoTime();
+
+ String expectedUserDir = joinPath(baseDir.toString(), USERCACHE, user1);
+ String expectedFileDir = joinPath(expectedUserDir, AsyncLocalizer.FILECACHE, AsyncLocalizer.ARCHIVESDIR);
+ assertTrue("user filecache dir not created", new File(expectedFileDir).exists());
+ File keyFile = new File(expectedFileDir, key1 + ".0");
+ assertTrue("blob not created", keyFile.exists());
+ assertTrue("blob is not uncompressed", keyFile.isDirectory());
+ File symlinkFile = new File(keyFile, "tmptestsymlink");
+
+ if (supportSymlinks) {
+ assertTrue("blob uncompressed doesn't contain symlink", Files.isSymbolicLink(
+ symlinkFile.toPath()));
+ } else {
+ assertTrue("blob symlink file doesn't exist", symlinkFile.exists());
+ }
+
+ LocalizedResourceSet lrsrcSet = localizer.getUserResources().get(user1);
+ assertEquals("local resource set size wrong", 1, lrsrcSet.getSize());
+ assertEquals("user doesn't match", user1, lrsrcSet.getUser());
+ LocalizedResource key1rsrc = lrsrcSet.get(key1, true);
+ assertNotNull("Local resource doesn't exist but should", key1rsrc);
+ assertEquals("key doesn't match", key1, key1rsrc.getKey());
+ assertEquals("refcount doesn't match", 1, key1rsrc.getRefCount());
+ assertEquals("file path doesn't match", keyFile.toString(), key1rsrc.getFilePathWithVersion());
+ assertEquals("size doesn't match", size, key1rsrc.getSize());
+ assertTrue("timestamp not within range", (key1rsrc.getLastAccessTime() >= timeBefore && key1rsrc
+ .getLastAccessTime() <= timeAfter));
+
+ timeBefore = System.nanoTime();
+ localizer.removeBlobReference(lrsrc.getKey(), user1, topo1, true);
+ timeAfter = System.nanoTime();
+
+ lrsrcSet = localizer.getUserResources().get(user1);
+ assertEquals("local resource set size wrong", 1, lrsrcSet.getSize());
+ key1rsrc = lrsrcSet.get(key1, true);
+ assertNotNull("Local resource doesn't exist but should", key1rsrc);
+ assertEquals("refcount doesn't match", 0, key1rsrc.getRefCount());
+ assertTrue("timestamp not within range", (key1rsrc.getLastAccessTime() >= timeBefore && key1rsrc
+ .getLastAccessTime() <= timeAfter));
+
+ // should remove the blob since cache size set really small
+ localizer.cleanup();
+
+ lrsrcSet = localizer.getUserResources().get(user1);
+ assertFalse("blob contents not deleted", symlinkFile.exists());
+ assertFalse("blob not deleted", keyFile.exists());
+ assertFalse("blob file dir not deleted", new File(expectedFileDir).exists());
+ assertFalse("blob dir not deleted", new File(expectedUserDir).exists());
+ assertNull("user set should be null", lrsrcSet);
+
+ }
+
+ @Test
+ public void testBasic() throws Exception {
+ Map<String, Object> conf = new HashMap();
+ // set clean time really high so doesn't kick in
+ conf.put(DaemonConfig.SUPERVISOR_LOCALIZER_CACHE_CLEANUP_INTERVAL_MS, 60*60*1000);
+
+ String key1 = "key1";
+ String topo1 = "topo1";
+ AsyncLocalizer localizer = new TestLocalizer(conf, baseDir.toString());
+ // set really small so will do cleanup
+ localizer.setTargetCacheSize(1);
+
+ ReadableBlobMeta rbm = new ReadableBlobMeta();
+ rbm.set_settable(new SettableBlobMeta(BlobStoreAclHandler.WORLD_EVERYTHING));
+ when(mockblobstore.getBlobMeta(key1)).thenReturn(rbm);
+
+ when(mockblobstore.getBlob(key1)).thenReturn(new TestInputStreamWithMeta());
+
+ long timeBefore = System.nanoTime();
+ File user1Dir = localizer.getLocalUserFileCacheDir(user1);
+ assertTrue("failed to create user dir", user1Dir.mkdirs());
+ LocalizedResource lrsrc = localizer.getBlob(new LocalResource(key1, false), user1, topo1,
+ user1Dir);
+ long timeAfter = System.nanoTime();
+
+ String expectedUserDir = joinPath(baseDir.toString(), USERCACHE, user1);
+ String expectedFileDir = joinPath(expectedUserDir, AsyncLocalizer.FILECACHE, AsyncLocalizer.FILESDIR);
+ assertTrue("user filecache dir not created", new File(expectedFileDir).exists());
+ File keyFile = new File(expectedFileDir, key1);
+ File keyFileCurrentSymlink = new File(expectedFileDir, key1 + ServerUtils.DEFAULT_CURRENT_BLOB_SUFFIX);
+
+ assertTrue("blob not created", keyFileCurrentSymlink.exists());
+
+ LocalizedResourceSet lrsrcSet = localizer.getUserResources().get(user1);
+ assertEquals("local resource set size wrong", 1, lrsrcSet.getSize());
+ assertEquals("user doesn't match", user1, lrsrcSet.getUser());
+ LocalizedResource key1rsrc = lrsrcSet.get(key1, false);
+ assertNotNull("Local resource doesn't exist but should", key1rsrc);
+ assertEquals("key doesn't match", key1, key1rsrc.getKey());
+ assertEquals("refcount doesn't match", 1, key1rsrc.getRefCount());
+ assertEquals("file path doesn't match", keyFile.toString(), key1rsrc.getFilePath());
+ assertEquals("size doesn't match", 34, key1rsrc.getSize());
+ assertTrue("timestamp not within range", (key1rsrc.getLastAccessTime() >= timeBefore && key1rsrc
+ .getLastAccessTime() <= timeAfter));
+
+ timeBefore = System.nanoTime();
+ localizer.removeBlobReference(lrsrc.getKey(), user1, topo1, false);
+ timeAfter = System.nanoTime();
+
+ lrsrcSet = localizer.getUserResources().get(user1);
+ assertEquals("local resource set size wrong", 1, lrsrcSet.getSize());
+ key1rsrc = lrsrcSet.get(key1, false);
+ assertNotNull("Local resource doesn't exist but should", key1rsrc);
+ assertEquals("refcount doesn't match", 0, key1rsrc.getRefCount());
+ assertTrue("timestamp not within range", (key1rsrc.getLastAccessTime() >= timeBefore && key1rsrc
+ .getLastAccessTime() <= timeAfter));
+
+ // should remove the blob since cache size set really small
+ localizer.cleanup();
+
+ lrsrcSet = localizer.getUserResources().get(user1);
+ assertNull("user set should be null", lrsrcSet);
+ assertFalse("blob not deleted", keyFile.exists());
+ assertFalse("blob dir not deleted", new File(expectedFileDir).exists());
+ assertFalse("blob dir not deleted", new File(expectedUserDir).exists());
+ }
+
+ @Test
+ public void testMultipleKeysOneUser() throws Exception {
+ Map<String, Object> conf = new HashMap();
+ // set clean time really high so doesn't kick in
+ conf.put(DaemonConfig.SUPERVISOR_LOCALIZER_CACHE_CLEANUP_INTERVAL_MS, 60*60*1000);
+
+ String key1 = "key1";
+ String topo1 = "topo1";
+ String key2 = "key2";
+ String key3 = "key3";
+ AsyncLocalizer localizer = new TestLocalizer(conf, baseDir.toString());
+ // set to keep 2 blobs (each of size 34)
+ localizer.setTargetCacheSize(68);
+
+ ReadableBlobMeta rbm = new ReadableBlobMeta();
+ rbm.set_settable(new SettableBlobMeta(BlobStoreAclHandler.WORLD_EVERYTHING));
+ when(mockblobstore.getBlobMeta(anyString())).thenReturn(rbm);
+ when(mockblobstore.getBlob(key1)).thenReturn(new TestInputStreamWithMeta());
+ when(mockblobstore.getBlob(key2)).thenReturn(new TestInputStreamWithMeta());
+ when(mockblobstore.getBlob(key3)).thenReturn(new TestInputStreamWithMeta());
+
+ List<LocalResource> keys = Arrays.asList(new LocalResource[]{new LocalResource(key1, false),
+ new LocalResource(key2, false), new LocalResource(key3, false)});
+ File user1Dir = localizer.getLocalUserFileCacheDir(user1);
+ assertTrue("failed to create user dir", user1Dir.mkdirs());
+
+ List<LocalizedResource> lrsrcs = localizer.getBlobs(keys, user1, topo1, user1Dir);
+ LocalizedResource lrsrc = lrsrcs.get(0);
+ LocalizedResource lrsrc2 = lrsrcs.get(1);
+ LocalizedResource lrsrc3 = lrsrcs.get(2);
+
+ String expectedFileDir = joinPath(baseDir.toString(), USERCACHE, user1,
+ AsyncLocalizer.FILECACHE, AsyncLocalizer.FILESDIR);
+ assertTrue("user filecache dir not created", new File(expectedFileDir).exists());
+ File keyFile = new File(expectedFileDir, key1 + ServerUtils.DEFAULT_CURRENT_BLOB_SUFFIX);
+ File keyFile2 = new File(expectedFileDir, key2 + ServerUtils.DEFAULT_CURRENT_BLOB_SUFFIX);
+ File keyFile3 = new File(expectedFileDir, key3 + ServerUtils.DEFAULT_CURRENT_BLOB_SUFFIX);
+
+ assertTrue("blob not created", keyFile.exists());
+ assertTrue("blob not created", keyFile2.exists());
+ assertTrue("blob not created", keyFile3.exists());
+ assertEquals("size doesn't match", 34, keyFile.length());
+ assertEquals("size doesn't match", 34, keyFile2.length());
+ assertEquals("size doesn't match", 34, keyFile3.length());
+ assertEquals("size doesn't match", 34, lrsrc.getSize());
+ assertEquals("size doesn't match", 34, lrsrc3.getSize());
+ assertEquals("size doesn't match", 34, lrsrc2.getSize());
+
+ LocalizedResourceSet lrsrcSet = localizer.getUserResources().get(user1);
+ assertEquals("local resource set size wrong", 3, lrsrcSet.getSize());
+ assertEquals("user doesn't match", user1, lrsrcSet.getUser());
+
+ long timeBefore = System.nanoTime();
+ localizer.removeBlobReference(lrsrc.getKey(), user1, topo1, false);
+ localizer.removeBlobReference(lrsrc2.getKey(), user1, topo1, false);
+ localizer.removeBlobReference(lrsrc3.getKey(), user1, topo1, false);
+ long timeAfter = System.nanoTime();
+
+ // add reference to one and then remove reference again so it has newer timestamp
+ lrsrc = localizer.getBlob(new LocalResource(key1, false), user1, topo1, user1Dir);
+ assertTrue("timestamp not within range", (lrsrc.getLastAccessTime() >= timeBefore && lrsrc
+ .getLastAccessTime() <= timeAfter));
+ localizer.removeBlobReference(lrsrc.getKey(), user1, topo1, false);
+
+ // should remove the second blob first
+ localizer.cleanup();
+
+ lrsrcSet = localizer.getUserResources().get(user1);
+ assertEquals("local resource set size wrong", 2, lrsrcSet.getSize());
+ long end = System.currentTimeMillis() + 100;
+ while ((end - System.currentTimeMillis()) >= 0 && keyFile2.exists()) {
+ Thread.sleep(1);
+ }
+ assertFalse("blob not deleted", keyFile2.exists());
+ assertTrue("blob deleted", keyFile.exists());
+ assertTrue("blob deleted", keyFile3.exists());
+
+ // set size to cleanup another one
+ localizer.setTargetCacheSize(34);
+
+ // should remove the third blob
+ localizer.cleanup();
+
+ lrsrcSet = localizer.getUserResources().get(user1);
+ assertEquals("local resource set size wrong", 1, lrsrcSet.getSize());
+ assertTrue("blob deleted", keyFile.exists());
+ assertFalse("blob not deleted", keyFile3.exists());
+ }
+
+ @Test(expected = AuthorizationException.class)
+ public void testFailAcls() throws Exception {
+ Map<String, Object> conf = new HashMap();
+ // set clean time really high so doesn't kick in
+ conf.put(DaemonConfig.SUPERVISOR_LOCALIZER_CACHE_CLEANUP_INTERVAL_MS, 60 * 60 * 1000);
+ // enable blobstore acl validation
+ conf.put(Config.STORM_BLOBSTORE_ACL_VALIDATION_ENABLED, true);
+
+ String topo1 = "topo1";
+ String key1 = "key1";
+ AsyncLocalizer localizer = new TestLocalizer(conf, baseDir.toString());
+
+ ReadableBlobMeta rbm = new ReadableBlobMeta();
+ // set acl so user doesn't have read access
+ AccessControl acl = new AccessControl(AccessControlType.USER, BlobStoreAclHandler.ADMIN);
+ acl.set_name(user1);
+ rbm.set_settable(new SettableBlobMeta(Arrays.asList(acl)));
+ when(mockblobstore.getBlobMeta(anyString())).thenReturn(rbm);
+ when(mockblobstore.getBlob(key1)).thenReturn(new TestInputStreamWithMeta());
+ File user1Dir = localizer.getLocalUserFileCacheDir(user1);
+ assertTrue("failed to create user dir", user1Dir.mkdirs());
+
+ // This should throw AuthorizationException because auth failed
+ localizer.getBlob(new LocalResource(key1, false), user1, topo1, user1Dir);
+ }
+
+ @Test(expected = KeyNotFoundException.class)
+ public void testKeyNotFoundException() throws Exception {
+ Map<String, Object> conf = Utils.readStormConfig();
+ String key1 = "key1";
+ conf.put(Config.STORM_LOCAL_DIR, "target");
+ LocalFsBlobStore bs = new LocalFsBlobStore();
+ LocalFsBlobStore spy = spy(bs);
+ Mockito.doReturn(true).when(spy).checkForBlobOrDownload(key1);
+ Mockito.doNothing().when(spy).checkForBlobUpdate(key1);
+ spy.prepare(conf,null,null);
+ spy.getBlob(key1, null);
+ }
+
+ @Test
+ public void testMultipleUsers() throws Exception {
+ Map<String, Object> conf = new HashMap();
+ // set clean time really high so doesn't kick in
+ conf.put(DaemonConfig.SUPERVISOR_LOCALIZER_CACHE_CLEANUP_INTERVAL_MS, 60*60*1000);
+
+ String topo1 = "topo1";
+ String topo2 = "topo2";
+ String topo3 = "topo3";
+ String key1 = "key1";
+ String key2 = "key2";
+ String key3 = "key3";
+ AsyncLocalizer localizer = new TestLocalizer(conf, baseDir.toString());
+ // set to keep 2 blobs (each of size 34)
+ localizer.setTargetCacheSize(68);
+
+ ReadableBlobMeta rbm = new ReadableBlobMeta();
+ rbm.set_settable(new SettableBlobMeta(BlobStoreAclHandler.WORLD_EVERYTHING));
+ when(mockblobstore.getBlobMeta(anyString())).thenReturn(rbm);
+ when(mockblobstore.getBlob(key1)).thenReturn(new TestInputStreamWithMeta());
+ when(mockblobstore.getBlob(key2)).thenReturn(new TestInputStreamWithMeta());
+ when(mockblobstore.getBlob(key3)).thenReturn(new TestInputStreamWithMeta());
+
+ File user1Dir = localizer.getLocalUserFileCacheDir(user1);
+ assertTrue("failed to create user dir", user1Dir.mkdirs());
+ File user2Dir = localizer.getLocalUserFileCacheDir(user2);
+ assertTrue("failed to create user dir", user2Dir.mkdirs());
+ File user3Dir = localizer.getLocalUserFileCacheDir(user3);
+ assertTrue("failed to create user dir", user3Dir.mkdirs());
+
+ LocalizedResource lrsrc = localizer.getBlob(new LocalResource(key1, false), user1, topo1,
+ user1Dir);
+ LocalizedResource lrsrc2 = localizer.getBlob(new LocalResource(key2, false), user2, topo2,
+ user2Dir);
+ LocalizedResource lrsrc3 = localizer.getBlob(new LocalResource(key3, false), user3, topo3,
+ user3Dir);
+ // make sure we support different user reading same blob
+ LocalizedResource lrsrc1_user3 = localizer.getBlob(new LocalResource(key1, false), user3,
+ topo3, user3Dir);
+
+ String expectedUserDir1 = joinPath(baseDir.toString(), USERCACHE, user1);
+ String expectedFileDirUser1 = joinPath(expectedUserDir1, AsyncLocalizer.FILECACHE, AsyncLocalizer.FILESDIR);
+ String expectedFileDirUser2 = joinPath(baseDir.toString(), USERCACHE, user2,
+ AsyncLocalizer.FILECACHE, AsyncLocalizer.FILESDIR);
+ String expectedFileDirUser3 = joinPath(baseDir.toString(), USERCACHE, user3,
+ AsyncLocalizer.FILECACHE, AsyncLocalizer.FILESDIR);
+ assertTrue("user filecache dir user1 not created", new File(expectedFileDirUser1).exists());
+ assertTrue("user filecache dir user2 not created", new File(expectedFileDirUser2).exists());
+ assertTrue("user filecache dir user3 not created", new File(expectedFileDirUser3).exists());
+
+ File keyFile = new File(expectedFileDirUser1, key1 + ServerUtils.DEFAULT_CURRENT_BLOB_SUFFIX);
+ File keyFile2 = new File(expectedFileDirUser2, key2 + ServerUtils.DEFAULT_CURRENT_BLOB_SUFFIX);
+ File keyFile3 = new File(expectedFileDirUser3, key3 + ServerUtils.DEFAULT_CURRENT_BLOB_SUFFIX);
+ File keyFile1user3 = new File(expectedFileDirUser3, key1 + ServerUtils.DEFAULT_CURRENT_BLOB_SUFFIX);
+
+ assertTrue("blob not created", keyFile.exists());
+ assertTrue("blob not created", keyFile2.exists());
+ assertTrue("blob not created", keyFile3.exists());
+ assertTrue("blob not created", keyFile1user3.exists());
+
+ LocalizedResourceSet lrsrcSet = localizer.getUserResources().get(user1);
+ assertEquals("local resource set size wrong", 1, lrsrcSet.getSize());
+ LocalizedResourceSet lrsrcSet2 = localizer.getUserResources().get(user2);
+ assertEquals("local resource set size wrong", 1, lrsrcSet2.getSize());
+ LocalizedResourceSet lrsrcSet3 = localizer.getUserResources().get(user3);
+ assertEquals("local resource set size wrong", 2, lrsrcSet3.getSize());
+
+ localizer.removeBlobReference(lrsrc.getKey(), user1, topo1, false);
+ // should remove key1
+ localizer.cleanup();
+
+ lrsrcSet = localizer.getUserResources().get(user1);
+ lrsrcSet3 = localizer.getUserResources().get(user3);
+ assertNull("user set should be null", lrsrcSet);
+ assertFalse("blob dir not deleted", new File(expectedFileDirUser1).exists());
+ assertFalse("blob dir not deleted", new File(expectedUserDir1).exists());
+ assertEquals("local resource set size wrong", 2, lrsrcSet3.getSize());
+
+ assertTrue("blob deleted", keyFile2.exists());
+ assertFalse("blob not deleted", keyFile.exists());
+ assertTrue("blob deleted", keyFile3.exists());
+ assertTrue("blob deleted", keyFile1user3.exists());
+ }
+
+ @Test
+ public void testUpdate() throws Exception {
+ Map<String, Object> conf = new HashMap();
+ // set clean time really high so doesn't kick in
+ conf.put(DaemonConfig.SUPERVISOR_LOCALIZER_CACHE_CLEANUP_INTERVAL_MS, 60*60*1000);
+
+ String key1 = "key1";
+ String topo1 = "topo1";
+ String topo2 = "topo2";
+ AsyncLocalizer localizer = new TestLocalizer(conf, baseDir.toString());
+
+ ReadableBlobMeta rbm = new ReadableBlobMeta();
+ rbm.set_version(1);
+ rbm.set_settable(new SettableBlobMeta(BlobStoreAclHandler.WORLD_EVERYTHING));
+ when(mockblobstore.getBlobMeta(key1)).thenReturn(rbm);
+ when(mockblobstore.getBlob(key1)).thenReturn(new TestInputStreamWithMeta());
+
+ File user1Dir = localizer.getLocalUserFileCacheDir(user1);
+ assertTrue("failed to create user dir", user1Dir.mkdirs());
+ LocalizedResource lrsrc = localizer.getBlob(new LocalResource(key1, false), user1, topo1,
+ user1Dir);
+
+ String expectedUserDir = joinPath(baseDir.toString(), USERCACHE, user1);
+ String expectedFileDir = joinPath(expectedUserDir, AsyncLocalizer.FILECACHE, AsyncLocalizer.FILESDIR);
+ assertTrue("user filecache dir not created", new File(expectedFileDir).exists());
+ File keyFile = new File(expectedFileDir, key1);
+ File keyFileCurrentSymlink = new File(expectedFileDir, key1 + ServerUtils.DEFAULT_CURRENT_BLOB_SUFFIX);
+ assertTrue("blob not created", keyFileCurrentSymlink.exists());
+ File versionFile = new File(expectedFileDir, key1 + ServerUtils.DEFAULT_BLOB_VERSION_SUFFIX);
+ assertTrue("blob version file not created", versionFile.exists());
+ assertEquals("blob version not correct", 1, ServerUtils.localVersionOfBlob(keyFile.toString()));
+
+ LocalizedResourceSet lrsrcSet = localizer.getUserResources().get(user1);
+ assertEquals("local resource set size wrong", 1, lrsrcSet.getSize());
+
+ // test another topology getting blob with updated version - it should update version now
+ rbm.set_version(2);
+
+ localizer.getBlob(new LocalResource(key1, false), user1, topo2, user1Dir);
+ assertTrue("blob version file not created", versionFile.exists());
+ assertEquals("blob version not correct", 2, ServerUtils.localVersionOfBlob(keyFile.toString()));
+ assertTrue("blob file with version 2 not created", new File(keyFile + ".2").exists());
+
+ // now test regular updateBlob
+ rbm.set_version(3);
+
+ ArrayList<LocalResource> arr = new ArrayList<LocalResource>();
+ arr.add(new LocalResource(key1, false));
+ localizer.updateBlobs(arr, user1);
+ assertTrue("blob version file not created", versionFile.exists());
+ assertEquals("blob version not correct", 3, ServerUtils.localVersionOfBlob(keyFile.toString()));
+ assertTrue("blob file with version 3 not created", new File(keyFile + ".3").exists());
+ }
}