You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2015/12/04 16:03:59 UTC
[04/17] storm git commit: Blobstore API STORM- 876
http://git-wip-us.apache.org/repos/asf/storm/blob/7029aee5/storm-core/src/jvm/backtype/storm/localizer/Localizer.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/localizer/Localizer.java b/storm-core/src/jvm/backtype/storm/localizer/Localizer.java
new file mode 100644
index 0000000..ef5684f
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/localizer/Localizer.java
@@ -0,0 +1,695 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package backtype.storm.localizer;
+
+import backtype.storm.Config;
+import backtype.storm.blobstore.ClientBlobStore;
+import backtype.storm.blobstore.InputStreamWithMeta;
+import backtype.storm.generated.AuthorizationException;
+import backtype.storm.generated.KeyNotFoundException;
+import backtype.storm.utils.ShellUtils.ExitCodeException;
+import backtype.storm.utils.ShellUtils.ShellCommandExecutor;
+import backtype.storm.utils.Utils;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.commons.io.FileUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.FileWriter;
+import java.io.FilenameFilter;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import static java.nio.file.StandardCopyOption.ATOMIC_MOVE;
+
+/**
+ * Class to download and manage files from the blobstore. It uses an LRU cache
+ * to determine which files to keep so they can be reused and which files to delete.
+ */
+public class Localizer {
+ public static final Logger LOG = LoggerFactory.getLogger(Localizer.class);
+
+ private Map _conf;
+ private int _threadPoolSize;
+ // thread pool for initial download
+ private ExecutorService _execService;
+ // thread pool for updates
+ private ExecutorService _updateExecService;
+ private int _blobDownloadRetries;
+
+ // track resources - user to resourceSet
+ private final ConcurrentMap<String, LocalizedResourceSet> _userRsrc = new
+ ConcurrentHashMap<String, LocalizedResourceSet>();
+
+ private String _localBaseDir;
+ public static final String USERCACHE = "usercache";
+ public static final String FILECACHE = "filecache";
+
+ // sub directories to store either files or uncompressed archives respectively
+ public static final String FILESDIR = "files";
+ public static final String ARCHIVESDIR = "archives";
+
+ private static final String TO_UNCOMPRESS = "_tmp_";
+
+ // cleanup
+ private long _cacheTargetSize;
+ private long _cacheCleanupPeriod;
+ private ScheduledExecutorService _cacheCleanupService;
+
+ public Localizer(Map conf, String baseDir) {
+ _conf = conf;
+ _localBaseDir = baseDir;
+ // default cache size 10GB, converted to Bytes
+ _cacheTargetSize = Utils.getInt(_conf.get(Config.SUPERVISOR_LOCALIZER_CACHE_TARGET_SIZE_MB),
+ 10 * 1024).longValue() << 20;
+ // default 10 minutes.
+ _cacheCleanupPeriod = Utils.getInt(_conf.get(
+ Config.SUPERVISOR_LOCALIZER_CACHE_CLEANUP_INTERVAL_MS), 10 * 60 * 1000).longValue();
+
+ // if we needed we could make config for update thread pool size
+ _threadPoolSize = Utils.getInt(_conf.get(Config.SUPERVISOR_BLOBSTORE_DOWNLOAD_THREAD_COUNT), 5);
+ _blobDownloadRetries = Utils.getInt(_conf.get(
+ Config.SUPERVISOR_BLOBSTORE_DOWNLOAD_MAX_RETRIES), 3);
+
+ _execService = Executors.newFixedThreadPool(_threadPoolSize);
+ _updateExecService = Executors.newFixedThreadPool(_threadPoolSize);
+ reconstructLocalizedResources();
+ }
+
+ // For testing, it allows setting size in bytes
+ protected void setTargetCacheSize(long size) {
+ _cacheTargetSize = size;
+ }
+
+ // For testing, be careful as it doesn't clone
+ ConcurrentMap<String, LocalizedResourceSet> getUserResources() {
+ return _userRsrc;
+ }
+
+ public void startCleaner() {
+ _cacheCleanupService = new ScheduledThreadPoolExecutor(1,
+ new ThreadFactoryBuilder()
+ .setNameFormat("Localizer Cache Cleanup")
+ .build());
+
+ _cacheCleanupService.scheduleWithFixedDelay(new Runnable() {
+ @Override
+ public void run() {
+ handleCacheCleanup();
+ }
+ }, _cacheCleanupPeriod, _cacheCleanupPeriod, TimeUnit.MILLISECONDS);
+ }
+
+ public void shutdown() {
+ if (_cacheCleanupService != null) {
+ _cacheCleanupService.shutdown();
+ }
+ if (_execService != null) {
+ _execService.shutdown();
+ }
+ if (_updateExecService != null) {
+ _updateExecService.shutdown();
+ }
+ }
+
+ // baseDir/supervisor/usercache/
+ protected File getUserCacheDir() {
+ return new File(_localBaseDir, USERCACHE);
+ }
+
+ // baseDir/supervisor/usercache/user1/
+ protected File getLocalUserDir(String userName) {
+ return new File(getUserCacheDir(), userName);
+ }
+
+ // baseDir/supervisor/usercache/user1/filecache
+ public File getLocalUserFileCacheDir(String userName) {
+ return new File(getLocalUserDir(userName), FILECACHE);
+ }
+
+ // baseDir/supervisor/usercache/user1/filecache/files
+ protected File getCacheDirForFiles(File dir) {
+ return new File(dir, FILESDIR);
+ }
+
+ // get the directory to put uncompressed archives in
+ // baseDir/supervisor/usercache/user1/filecache/archives
+ protected File getCacheDirForArchives(File dir) {
+ return new File(dir, ARCHIVESDIR);
+ }
+
+ protected void addLocalizedResourceInDir(String dir, LocalizedResourceSet lrsrcSet,
+ boolean uncompress) {
+ File[] lrsrcs = readCurrentBlobs(dir);
+
+ if (lrsrcs != null) {
+ for (File rsrc : lrsrcs) {
+ LOG.info("add localized in dir found: " + rsrc);
+ /// strip off .suffix
+ String path = rsrc.getPath();
+ int p = path.lastIndexOf('.');
+ if (p > 0) {
+ path = path.substring(0, p);
+ }
+ LOG.debug("local file is: {} path is: {}", rsrc.getPath(), path);
+ LocalizedResource lrsrc = new LocalizedResource(new File(path).getName(), path,
+ uncompress);
+ lrsrcSet.addResource(lrsrc.getKey(), lrsrc, uncompress);
+ }
+ }
+ }
+
+ protected File[] readDirContents(String location) {
+ File dir = new File(location);
+ File[] files = null;
+ if (dir.exists()) {
+ files = dir.listFiles();
+ }
+ return files;
+ }
+
+ // Looks for files in the directory with .current suffix
+ protected File[] readCurrentBlobs(String location) {
+ File dir = new File(location);
+ File[] files = null;
+ if (dir.exists()) {
+ files = dir.listFiles(new FilenameFilter() {
+ @Override
+ public boolean accept(File dir, String name) {
+ return name.toLowerCase().endsWith(Utils.DEFAULT_CURRENT_BLOB_SUFFIX);
+ }
+ });
+ }
+ return files;
+ }
+
+ // Check to see if there are any existing files already localized.
+ protected void reconstructLocalizedResources() {
+ try {
+ LOG.info("Reconstruct localized resource: " + getUserCacheDir().getPath());
+ File[] users = readDirContents(getUserCacheDir().getPath());
+
+ if (users != null) {
+ for (File userDir : users) {
+ String user = userDir.getName();
+ LOG.debug("looking in: {} for user: {}", userDir.getPath(), user);
+ LocalizedResourceSet newSet = new LocalizedResourceSet(user);
+ LocalizedResourceSet lrsrcSet = _userRsrc.putIfAbsent(user, newSet);
+ if (lrsrcSet == null) {
+ lrsrcSet = newSet;
+ }
+ addLocalizedResourceInDir(getCacheDirForFiles(getLocalUserFileCacheDir(user)).getPath(),
+ lrsrcSet, false);
+ addLocalizedResourceInDir(
+ getCacheDirForArchives(getLocalUserFileCacheDir(user)).getPath(),
+ lrsrcSet, true);
+ }
+ } else {
+ LOG.warn("No left over resources found for any user during reconstructing of local resources at: {}", getUserCacheDir().getPath());
+ }
+ } catch (Exception e) {
+ LOG.error("ERROR reconstructing localized resources", e);
+ }
+ }
+
+ // ignores invalid user/topo/key
+ public synchronized void removeBlobReference(String key, String user, String topo,
+ boolean uncompress) throws AuthorizationException, KeyNotFoundException {
+ LocalizedResourceSet lrsrcSet = _userRsrc.get(user);
+ if (lrsrcSet != null) {
+ LocalizedResource lrsrc = lrsrcSet.get(key, uncompress);
+ if (lrsrc != null) {
+ LOG.debug("removing blob reference to: {} for topo: {}", key, topo);
+ lrsrc.removeReference(topo);
+ } else {
+ LOG.warn("trying to remove non-existent blob, key: " + key + " for user: " + user +
+ " topo: " + topo);
+ }
+ } else {
+ LOG.warn("trying to remove blob for non-existent resource set for user: " + user + " key: "
+ + key + " topo: " + topo);
+ }
+ }
+
+ public synchronized void addReferences(List<LocalResource> localresource, String user,
+ String topo) {
+ LocalizedResourceSet lrsrcSet = _userRsrc.get(user);
+ if (lrsrcSet != null) {
+ for (LocalResource blob : localresource) {
+ LocalizedResource lrsrc = lrsrcSet.get(blob.getBlobName(), blob.shouldUncompress());
+ if (lrsrc != null) {
+ lrsrc.addReference(topo);
+ LOG.debug("added reference for topo: {} key: {}", topo, blob);
+ } else {
+ LOG.warn("trying to add reference to non-existent blob, key: " + blob + " topo: " + topo);
+ }
+ }
+ } else {
+ LOG.warn("trying to add reference to non-existent local resource set, " +
+ "user: " + user + " topo: " + topo);
+ }
+ }
+
+ /**
+ * This function either returns the blob in the existing cache or if it doesn't exist in the
+ * cache, it will download the blob and will block until the download is complete.
+ */
+ public LocalizedResource getBlob(LocalResource localResource, String user, String topo,
+ File userFileDir) throws AuthorizationException, KeyNotFoundException, IOException {
+ ArrayList<LocalResource> arr = new ArrayList<LocalResource>();
+ arr.add(localResource);
+ List<LocalizedResource> results = getBlobs(arr, user, topo, userFileDir);
+ if (results.isEmpty() || results.size() != 1) {
+ throw new IOException("Unknown error getting blob: " + localResource + ", for user: " + user +
+ ", topo: " + topo);
+ }
+ return results.get(0);
+ }
+
+ protected boolean isLocalizedResourceDownloaded(LocalizedResource lrsrc) {
+ File rsrcFileCurrent = new File(lrsrc.getCurrentSymlinkPath());
+ File rsrcFileWithVersion = new File(lrsrc.getFilePathWithVersion());
+ File versionFile = new File(lrsrc.getVersionFilePath());
+ return (rsrcFileWithVersion.exists() && rsrcFileCurrent.exists() && versionFile.exists());
+ }
+
+ protected boolean isLocalizedResourceUpToDate(LocalizedResource lrsrc,
+ ClientBlobStore blobstore) throws AuthorizationException, KeyNotFoundException {
+ String localFile = lrsrc.getFilePath();
+ long nimbusBlobVersion = Utils.nimbusVersionOfBlob(lrsrc.getKey(), blobstore);
+ long currentBlobVersion = Utils.localVersionOfBlob(localFile);
+ return (nimbusBlobVersion == currentBlobVersion);
+ }
+
+ protected ClientBlobStore getClientBlobStore() {
+ return Utils.getClientBlobStoreForSupervisor(_conf);
+ }
+
+ /**
+ * This function updates blobs on the supervisor. It uses a separate thread pool and runs
+ * asynchronously of the download and delete.
+ */
+ public List<LocalizedResource> updateBlobs(List<LocalResource> localResources,
+ String user) throws AuthorizationException, KeyNotFoundException, IOException {
+ LocalizedResourceSet lrsrcSet = _userRsrc.get(user);
+ ArrayList<LocalizedResource> results = new ArrayList<>();
+ ArrayList<Callable<LocalizedResource>> updates = new ArrayList<>();
+
+ if (lrsrcSet == null) {
+ // resource set must have been removed
+ return results;
+ }
+ ClientBlobStore blobstore = null;
+ try {
+ blobstore = getClientBlobStore();
+ for (LocalResource localResource: localResources) {
+ String key = localResource.getBlobName();
+ LocalizedResource lrsrc = lrsrcSet.get(key, localResource.shouldUncompress());
+ if (lrsrc == null) {
+ LOG.warn("blob requested for update doesn't exist: {}", key);
+ continue;
+ } else {
+ // update it if either the version isn't the latest or if any local blob files are missing
+ if (!isLocalizedResourceUpToDate(lrsrc, blobstore) ||
+ !isLocalizedResourceDownloaded(lrsrc)) {
+ LOG.debug("updating blob: {}", key);
+ updates.add(new DownloadBlob(this, _conf, key, new File(lrsrc.getFilePath()), user,
+ lrsrc.isUncompressed(), true));
+ }
+ }
+ }
+ } finally {
+ if(blobstore != null) {
+ blobstore.shutdown();
+ }
+ }
+ try {
+ List<Future<LocalizedResource>> futures = _updateExecService.invokeAll(updates);
+ for (Future<LocalizedResource> futureRsrc : futures) {
+ try {
+ LocalizedResource lrsrc = futureRsrc.get();
+ // put the resource just in case it was removed at same time by the cleaner
+ LocalizedResourceSet newSet = new LocalizedResourceSet(user);
+ LocalizedResourceSet newlrsrcSet = _userRsrc.putIfAbsent(user, newSet);
+ if (newlrsrcSet == null) {
+ newlrsrcSet = newSet;
+ }
+ newlrsrcSet.updateResource(lrsrc.getKey(), lrsrc, lrsrc.isUncompressed());
+ results.add(lrsrc);
+ }
+ catch (ExecutionException e) {
+ LOG.error("Error updating blob: ", e);
+ if (e.getCause() instanceof AuthorizationException) {
+ throw (AuthorizationException)e.getCause();
+ }
+ if (e.getCause() instanceof KeyNotFoundException) {
+ throw (KeyNotFoundException)e.getCause();
+ }
+ }
+ }
+ } catch (RejectedExecutionException re) {
+ LOG.error("Error updating blobs : ", re);
+ } catch (InterruptedException ie) {
+ throw new IOException("Interrupted Exception", ie);
+ }
+ return results;
+ }
+
+ /**
+ * This function either returns the blobs in the existing cache or if they don't exist in the
+ * cache, it downloads them in parallel (up to SUPERVISOR_BLOBSTORE_DOWNLOAD_THREAD_COUNT)
+ * and will block until all of them have been downloaded
+ */
+ public synchronized List<LocalizedResource> getBlobs(List<LocalResource> localResources,
+ String user, String topo, File userFileDir)
+ throws AuthorizationException, KeyNotFoundException, IOException {
+
+ LocalizedResourceSet newSet = new LocalizedResourceSet(user);
+ LocalizedResourceSet lrsrcSet = _userRsrc.putIfAbsent(user, newSet);
+ if (lrsrcSet == null) {
+ lrsrcSet = newSet;
+ }
+ ArrayList<LocalizedResource> results = new ArrayList<>();
+ ArrayList<Callable<LocalizedResource>> downloads = new ArrayList<>();
+
+ ClientBlobStore blobstore = null;
+ try {
+ blobstore = getClientBlobStore();
+ for (LocalResource localResource: localResources) {
+ String key = localResource.getBlobName();
+ boolean uncompress = localResource.shouldUncompress();
+ LocalizedResource lrsrc = lrsrcSet.get(key, localResource.shouldUncompress());
+ boolean isUpdate = false;
+ if ((lrsrc != null) && (lrsrc.isUncompressed() == localResource.shouldUncompress()) &&
+ (isLocalizedResourceDownloaded(lrsrc))) {
+ if (isLocalizedResourceUpToDate(lrsrc, blobstore)) {
+ LOG.debug("blob already exists: {}", key);
+ lrsrc.addReference(topo);
+ results.add(lrsrc);
+ continue;
+ }
+ LOG.debug("blob exists but isn't up to date: {}", key);
+ isUpdate = true;
+ }
+
+ // go off to blobstore and get it
+ // assume dir passed in exists and has correct permission
+ LOG.debug("fetching blob: {}", key);
+ File downloadDir = getCacheDirForFiles(userFileDir);
+ File localFile = new File(downloadDir, key);
+ if (uncompress) {
+ // for compressed file, download to archives dir
+ downloadDir = getCacheDirForArchives(userFileDir);
+ localFile = new File(downloadDir, key);
+ }
+ downloadDir.mkdir();
+ downloads.add(new DownloadBlob(this, _conf, key, localFile, user, uncompress,
+ isUpdate));
+ }
+ } finally {
+ if(blobstore !=null) {
+ blobstore.shutdown();
+ }
+ }
+ try {
+ List<Future<LocalizedResource>> futures = _execService.invokeAll(downloads);
+ for (Future<LocalizedResource> futureRsrc: futures) {
+ LocalizedResource lrsrc = futureRsrc.get();
+ lrsrc.addReference(topo);
+ lrsrcSet.addResource(lrsrc.getKey(), lrsrc, lrsrc.isUncompressed());
+ results.add(lrsrc);
+ }
+ } catch (ExecutionException e) {
+ if (e.getCause() instanceof AuthorizationException)
+ throw (AuthorizationException)e.getCause();
+ else if (e.getCause() instanceof KeyNotFoundException) {
+ throw (KeyNotFoundException)e.getCause();
+ } else {
+ throw new IOException("Error getting blobs", e);
+ }
+ } catch (RejectedExecutionException re) {
+ throw new IOException("RejectedExecutionException: ", re);
+ } catch (InterruptedException ie) {
+ throw new IOException("Interrupted Exception", ie);
+ }
+ return results;
+ }
+
+ static class DownloadBlob implements Callable<LocalizedResource> {
+
+ private Localizer _localizer;
+ private Map _conf;
+ private String _key;
+ private File _localFile;
+ private String _user;
+ private boolean _uncompress;
+ private boolean _isUpdate;
+
+ public DownloadBlob(Localizer localizer, Map conf, String key, File localFile,
+ String user, boolean uncompress, boolean update) {
+ _localizer = localizer;
+ _conf = conf;
+ _key = key;
+ _localFile = localFile;
+ _user = user;
+ _uncompress = uncompress;
+ _isUpdate = update;
+ }
+
+ @Override
+ public LocalizedResource call()
+ throws AuthorizationException, KeyNotFoundException, IOException {
+ return _localizer.downloadBlob(_conf, _key, _localFile, _user, _uncompress,
+ _isUpdate);
+ }
+ }
+
+ private LocalizedResource downloadBlob(Map conf, String key, File localFile,
+ String user, boolean uncompress, boolean isUpdate)
+ throws AuthorizationException, KeyNotFoundException, IOException {
+ ClientBlobStore blobstore = null;
+ try {
+ blobstore = getClientBlobStore();
+ long nimbusBlobVersion = Utils.nimbusVersionOfBlob(key, blobstore);
+ long oldVersion = Utils.localVersionOfBlob(localFile.toString());
+ FileOutputStream out = null;
+ PrintWriter writer = null;
+ int numTries = 0;
+ String localizedPath = localFile.toString();
+ String localFileWithVersion = Utils.constructBlobWithVersionFileName(localFile.toString(),
+ nimbusBlobVersion);
+ String localVersionFile = Utils.constructVersionFileName(localFile.toString());
+ String downloadFile = localFileWithVersion;
+ if (uncompress) {
+ // we need to download to temp file and then unpack into the one requested
+ downloadFile = new File(localFile.getParent(), TO_UNCOMPRESS + localFile.getName()).toString();
+ }
+ while (numTries < _blobDownloadRetries) {
+ out = new FileOutputStream(downloadFile);
+ numTries++;
+ try {
+ if (!Utils.canUserReadBlob(blobstore.getBlobMeta(key), user)) {
+ throw new AuthorizationException(user + " does not have READ access to " + key);
+ }
+ InputStreamWithMeta in = blobstore.getBlob(key);
+ byte[] buffer = new byte[1024];
+ int len;
+ while ((len = in.read(buffer)) >= 0) {
+ out.write(buffer, 0, len);
+ }
+ out.close();
+ in.close();
+ if (uncompress) {
+ Utils.unpack(new File(downloadFile), new File(localFileWithVersion));
+ LOG.debug("uncompressed " + downloadFile + " to: " + localFileWithVersion);
+ }
+
+ // Next write the version.
+ LOG.info("Blob: " + key + " updated with new Nimbus-provided version: " +
+ nimbusBlobVersion + " local version was: " + oldVersion);
+ // The false parameter ensures overwriting the version file, not appending
+ writer = new PrintWriter(
+ new BufferedWriter(new FileWriter(localVersionFile, false)));
+ writer.println(nimbusBlobVersion);
+ writer.close();
+
+ try {
+ setBlobPermissions(conf, user, localFileWithVersion);
+ setBlobPermissions(conf, user, localVersionFile);
+
+ // Update the key.current symlink. First create tmp symlink and do
+ // move of tmp to current so that the operation is atomic.
+ String tmp_uuid_local = java.util.UUID.randomUUID().toString();
+ LOG.debug("Creating a symlink @" + localFile + "." + tmp_uuid_local + " , " +
+ "linking to: " + localFile + "." + nimbusBlobVersion);
+ File uuid_symlink = new File(localFile + "." + tmp_uuid_local);
+
+ Files.createSymbolicLink(uuid_symlink.toPath(),
+ Paths.get(Utils.constructBlobWithVersionFileName(localFile.toString(),
+ nimbusBlobVersion)));
+ File current_symlink = new File(Utils.constructBlobCurrentSymlinkName(
+ localFile.toString()));
+ Files.move(uuid_symlink.toPath(), current_symlink.toPath(), ATOMIC_MOVE);
+ } catch (IOException e) {
+ // if we fail after writing the version file but before we move current link we need to
+ // restore the old version to the file
+ try {
+ PrintWriter restoreWriter = new PrintWriter(
+ new BufferedWriter(new FileWriter(localVersionFile, false)));
+ restoreWriter.println(oldVersion);
+ restoreWriter.close();
+ } catch (IOException ignore) {}
+ throw e;
+ }
+
+ String oldBlobFile = localFile + "." + oldVersion;
+ try {
+ // Remove the old version. Note that if a number of processes have that file open,
+ // the OS will keep the old blob file around until they all close the handle and only
+ // then deletes it. No new process will open the old blob, since the users will open the
+ // blob through the "blob.current" symlink, which always points to the latest version of
+ // a blob. Remove the old version after the current symlink is updated as to not affect
+ // anyone trying to read it.
+ if ((oldVersion != -1) && (oldVersion != nimbusBlobVersion)) {
+ LOG.info("Removing an old blob file:" + oldBlobFile);
+ Files.delete(Paths.get(oldBlobFile));
+ }
+ } catch (IOException e) {
+ // At this point we have downloaded everything and moved symlinks. If the remove of
+ // old fails just log an error
+ LOG.error("Exception removing old blob version: " + oldBlobFile);
+ }
+
+ break;
+ } catch (AuthorizationException ae) {
+ // we consider this non-retriable exceptions
+ if (out != null) {
+ out.close();
+ }
+ new File(downloadFile).delete();
+ throw ae;
+ } catch (IOException | KeyNotFoundException e) {
+ if (out != null) {
+ out.close();
+ }
+ if (writer != null) {
+ writer.close();
+ }
+ new File(downloadFile).delete();
+ if (uncompress) {
+ try {
+ FileUtils.deleteDirectory(new File(localFileWithVersion));
+ } catch (IOException ignore) {}
+ }
+ if (!isUpdate) {
+ // don't want to remove existing version file if its an update
+ new File(localVersionFile).delete();
+ }
+
+ if (numTries < _blobDownloadRetries) {
+ LOG.error("Failed to download blob, retrying", e);
+ } else {
+ throw e;
+ }
+ }
+ }
+ return new LocalizedResource(key, localizedPath, uncompress);
+ } finally {
+ if(blobstore != null) {
+ blobstore.shutdown();
+ }
+ }
+ }
+
+ public void setBlobPermissions(Map conf, String user, String path)
+ throws IOException {
+
+ if (!Utils.getBoolean(conf.get(Config.SUPERVISOR_RUN_WORKER_AS_USER), false)) {
+ return;
+ }
+ String wlCommand = Utils.getString(conf.get(Config.SUPERVISOR_WORKER_LAUNCHER), "");
+ if (wlCommand.isEmpty()) {
+ String stormHome = System.getProperty("storm.home");
+ wlCommand = stormHome + "/bin/worker-launcher";
+ }
+ List<String> command = new ArrayList<String>(Arrays.asList(wlCommand, user, "blob", path));
+
+ String[] commandArray = command.toArray(new String[command.size()]);
+ ShellCommandExecutor shExec = new ShellCommandExecutor(commandArray);
+ LOG.info("Setting blob permissions, command: {}", Arrays.toString(commandArray));
+
+ try {
+ shExec.execute();
+ LOG.debug("output: {}", shExec.getOutput());
+ } catch (ExitCodeException e) {
+ int exitCode = shExec.getExitCode();
+ LOG.warn("Exit code from worker-launcher is : " + exitCode, e);
+ LOG.debug("output: {}", shExec.getOutput());
+ throw new IOException("Setting blob permissions failed" +
+ " (exitCode=" + exitCode + ") with output: " + shExec.getOutput(), e);
+ }
+ }
+
+
+ public synchronized void handleCacheCleanup() {
+ LocalizedResourceRetentionSet toClean = new LocalizedResourceRetentionSet(_cacheTargetSize);
+ // need one large set of all and then clean via LRU
+ for (LocalizedResourceSet t : _userRsrc.values()) {
+ toClean.addResources(t);
+ LOG.debug("Resources to be cleaned after adding {} : {}", t.getUser(), toClean);
+ }
+ toClean.cleanup();
+ LOG.debug("Resource cleanup: {}", toClean);
+ for (LocalizedResourceSet t : _userRsrc.values()) {
+ if (t.getSize() == 0) {
+ String user = t.getUser();
+
+ LOG.debug("removing empty set: {}", user);
+ File userFileCacheDir = getLocalUserFileCacheDir(user);
+ getCacheDirForFiles(userFileCacheDir).delete();
+ getCacheDirForArchives(userFileCacheDir).delete();
+ getLocalUserFileCacheDir(user).delete();
+ boolean dirsRemoved = getLocalUserDir(user).delete();
+ // to catch race with update thread
+ if (dirsRemoved) {
+ _userRsrc.remove(user);
+ }
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/7029aee5/storm-core/src/jvm/backtype/storm/security/auth/NimbusPrincipal.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/security/auth/NimbusPrincipal.java b/storm-core/src/jvm/backtype/storm/security/auth/NimbusPrincipal.java
new file mode 100644
index 0000000..c07ae84
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/security/auth/NimbusPrincipal.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package backtype.storm.security.auth;
+
+import java.security.Principal;
+
+public class NimbusPrincipal implements Principal {
+
+ @Override
+ public String getName() {
+ return NimbusPrincipal.class.toString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/7029aee5/storm-core/src/jvm/backtype/storm/utils/BufferInputStream.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/utils/BufferInputStream.java b/storm-core/src/jvm/backtype/storm/utils/BufferInputStream.java
new file mode 100644
index 0000000..c718858
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/utils/BufferInputStream.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package backtype.storm.utils;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Arrays;
+
+
+public class BufferInputStream {
+ byte[] buffer;
+ InputStream stream;
+
+ public BufferInputStream(InputStream stream, int bufferSize) {
+ this.stream = stream;
+ buffer = new byte[bufferSize];
+ }
+
+ public BufferInputStream(InputStream stream) {
+ this(stream, 15*1024);
+ }
+
+ public byte[] read() throws IOException {
+ int length = stream.read(buffer);
+ if(length==-1) {
+ close();
+ return new byte[0];
+ } else if(length==buffer.length) {
+ return buffer;
+ } else {
+ return Arrays.copyOf(buffer, length);
+ }
+ }
+
+ public void close() throws IOException {
+ stream.close();
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/7029aee5/storm-core/src/jvm/backtype/storm/utils/ShellUtils.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/utils/ShellUtils.java b/storm-core/src/jvm/backtype/storm/utils/ShellUtils.java
index 8595b71..55e3866 100644
--- a/storm-core/src/jvm/backtype/storm/utils/ShellUtils.java
+++ b/storm-core/src/jvm/backtype/storm/utils/ShellUtils.java
@@ -102,6 +102,13 @@ abstract public class ShellUtils {
this(interval, false);
}
+ /** get the exit code
+ * @return the exit code of the process
+ */
+ public int getExitCode() {
+ return exitCode;
+ }
+
/**
* @param interval the minimum duration to wait before re-executing the
* command.
http://git-wip-us.apache.org/repos/asf/storm/blob/7029aee5/storm-core/src/jvm/backtype/storm/utils/Utils.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/utils/Utils.java b/storm-core/src/jvm/backtype/storm/utils/Utils.java
index c086be2..e0bbb1f 100644
--- a/storm-core/src/jvm/backtype/storm/utils/Utils.java
+++ b/storm-core/src/jvm/backtype/storm/utils/Utils.java
@@ -18,18 +18,35 @@
package backtype.storm.utils;
import backtype.storm.Config;
+import backtype.storm.blobstore.BlobStore;
+import backtype.storm.blobstore.BlobStoreAclHandler;
+import backtype.storm.blobstore.ClientBlobStore;
+import backtype.storm.blobstore.InputStreamWithMeta;
+import backtype.storm.blobstore.LocalFsBlobStore;
+import backtype.storm.generated.AccessControl;
+import backtype.storm.generated.AccessControlType;
import backtype.storm.generated.AuthorizationException;
import backtype.storm.generated.ComponentCommon;
import backtype.storm.generated.ComponentObject;
+import backtype.storm.generated.KeyNotFoundException;
+import backtype.storm.generated.ReadableBlobMeta;
+import backtype.storm.generated.SettableBlobMeta;
import backtype.storm.generated.StormTopology;
+import backtype.storm.localizer.Localizer;
+import backtype.storm.nimbus.NimbusInfo;
import backtype.storm.serialization.DefaultSerializationDelegate;
import backtype.storm.serialization.SerializationDelegate;
import clojure.lang.IFn;
import clojure.lang.RT;
+import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
+import org.apache.commons.compress.archivers.tar.TarArchiveInputStream;
import org.apache.commons.lang.StringUtils;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.thrift.TBase;
+import org.apache.thrift.TDeserializer;
import org.apache.thrift.TException;
+import org.apache.thrift.TSerializer;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Id;
@@ -39,13 +56,16 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.yaml.snakeyaml.Yaml;
import org.yaml.snakeyaml.constructor.SafeConstructor;
-
import java.net.URL;
import java.net.URLDecoder;
import java.nio.ByteBuffer;
-import java.nio.channels.Channels;
-import java.nio.channels.WritableByteChannel;
+import java.nio.file.FileSystems;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.nio.file.attribute.PosixFilePermission;
+
import java.io.File;
+import java.io.FileReader;
import java.io.FileInputStream;
import java.io.ByteArrayOutputStream;
import java.io.ObjectOutputStream;
@@ -57,13 +77,21 @@ import java.io.InputStream;
import java.io.OutputStream;
import java.io.FileOutputStream;
import java.io.BufferedReader;
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.PrintStream;
import java.io.RandomAccessFile;
import java.io.Serializable;
import java.io.IOException;
+
+import java.util.jar.JarEntry;
+import java.util.jar.JarFile;
import java.util.Map;
+import java.util.Set;
import java.util.Iterator;
import java.util.Enumeration;
import java.util.List;
+import java.util.Arrays;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.HashMap;
@@ -83,6 +111,9 @@ import org.apache.thrift.TSerializer;
public class Utils {
private static final Logger LOG = LoggerFactory.getLogger(Utils.class);
public static final String DEFAULT_STREAM_ID = "default";
+ public static final String DEFAULT_BLOB_VERSION_SUFFIX = ".version";
+ public static final String CURRENT_BLOB_SUFFIX_ID = "current";
+ public static final String DEFAULT_CURRENT_BLOB_SUFFIX = "." + CURRENT_BLOB_SUFFIX_ID;
private static ThreadLocal<TSerializer> threadSer = new ThreadLocal<TSerializer>();
private static ThreadLocal<TDeserializer> threadDes = new ThreadLocal<TDeserializer>();
@@ -110,28 +141,6 @@ public class Utils {
return serializationDelegate.deserialize(serialized, clazz);
}
- public static byte[] thriftSerialize(TBase t) {
- try {
- TSerializer ser = threadSer.get();
- if (ser == null) {
- ser = new TSerializer();
- threadSer.set(ser);
- }
- return ser.serialize(t);
- } catch (TException e) {
- throw new RuntimeException(e);
- }
- }
-
- private static TDeserializer getDes() {
- TDeserializer des = threadDes.get();
- if(des == null) {
- des = new TDeserializer();
- threadDes.set(des);
- }
- return des;
- }
-
public static <T> T thriftDeserialize(Class c, byte[] b, int offset, int length) {
try {
T ret = (T) c.newInstance();
@@ -143,18 +152,6 @@ public class Utils {
}
}
- public static <T> T thriftDeserialize(Class c, byte[] b) {
- try {
- T ret = (T) c.newInstance();
- TDeserializer des = getDes();
- des.deserialize((TBase) ret, b);
- return ret;
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
-
- }
-
public static byte[] javaSerialize(Object obj) {
try {
ByteArrayOutputStream bos = new ByteArrayOutputStream();
@@ -431,38 +428,170 @@ public class Utils {
return ret;
}
- public static void downloadFromMaster(Map conf, String file, String localFile) throws AuthorizationException, IOException, TException {
- NimbusClient client = NimbusClient.getConfiguredClient(conf);
- try {
- download(client, file, localFile);
- } finally {
- client.close();
+
+ public static Localizer createLocalizer(Map conf, String baseDir) {
+ return new Localizer(conf, baseDir);
+ }
+
+ public static ClientBlobStore getClientBlobStoreForSupervisor(Map conf) {
+ ClientBlobStore store = (ClientBlobStore) newInstance(
+ (String) conf.get(Config.SUPERVISOR_BLOBSTORE));
+ store.prepare(conf);
+ return store;
+ }
+
+ public static BlobStore getNimbusBlobStore(Map conf, NimbusInfo nimbusInfo) {
+ return getNimbusBlobStore(conf, null, nimbusInfo);
+ }
+
+ public static BlobStore getNimbusBlobStore(Map conf, String baseDir, NimbusInfo nimbusInfo) {
+ String type = (String)conf.get(Config.NIMBUS_BLOBSTORE);
+ if (type == null) {
+ type = LocalFsBlobStore.class.getName();
}
+ BlobStore store = (BlobStore) newInstance(type);
+ HashMap nconf = new HashMap(conf);
+ // only enable cleanup of blobstore on nimbus
+ nconf.put(Config.BLOBSTORE_CLEANUP_ENABLE, Boolean.TRUE);
+ store.prepare(nconf, baseDir, nimbusInfo);
+ return store;
}
- public static void downloadFromHost(Map conf, String file, String localFile, String host, int port) throws IOException, TException, AuthorizationException {
- NimbusClient client = new NimbusClient (conf, host, port, null);
- try {
- download(client, file, localFile);
- } finally {
- client.close();
+ /**
+ * Meant to be called only by the supervisor for stormjar/stormconf/stormcode files.
+ * @param key
+ * @param localFile
+ * @param cb
+ * @throws AuthorizationException
+ * @throws KeyNotFoundException
+ * @throws IOException
+ */
+ public static void downloadResourcesAsSupervisor(String key, String localFile,
+ ClientBlobStore cb) throws AuthorizationException, KeyNotFoundException, IOException {
+ final int MAX_RETRY_ATTEMPTS = 2;
+ final int ATTEMPTS_INTERVAL_TIME = 100;
+ for (int retryAttempts = 0; retryAttempts < MAX_RETRY_ATTEMPTS; retryAttempts++) {
+ if (downloadResourcesAsSupervisorAttempt(cb, key, localFile)) {
+ break;
+ }
+ Utils.sleep(ATTEMPTS_INTERVAL_TIME);
}
}
- private static void download(NimbusClient client, String file, String localFile) throws IOException, TException, AuthorizationException {
- WritableByteChannel out = Channels.newChannel(new FileOutputStream(localFile));
+ public static ClientBlobStore getClientBlobStore(Map conf) {
+ ClientBlobStore store = (ClientBlobStore) Utils.newInstance((String) conf.get(Config.CLIENT_BLOBSTORE));
+ store.prepare(conf);
+ return store;
+ }
+
+ private static boolean downloadResourcesAsSupervisorAttempt(ClientBlobStore cb, String key, String localFile) {
+ boolean isSuccess = false;
+ FileOutputStream out = null;
+ InputStreamWithMeta in = null;
try {
- String id = client.getClient().beginFileDownload(file);
- while (true) {
- ByteBuffer chunk = client.getClient().downloadChunk(id);
- int written = out.write(chunk);
- if (written == 0) break;
+ out = new FileOutputStream(localFile);
+ in = cb.getBlob(key);
+ long fileSize = in.getFileLength();
+
+ byte[] buffer = new byte[1024];
+ int len;
+ int downloadFileSize = 0;
+ while ((len = in.read(buffer)) >= 0) {
+ out.write(buffer, 0, len);
+ downloadFileSize += len;
}
+
+ isSuccess = (fileSize == downloadFileSize);
+ } catch (TException | IOException e) {
+ LOG.error("An exception happened while downloading {} from blob store.", localFile, e);
} finally {
- out.close();
+ try {
+ if (out != null) {
+ out.close();
+ }
+ } catch (IOException ignored) {}
+ try {
+ if (in != null) {
+ in.close();
+ }
+ } catch (IOException ignored) {}
+ }
+ if (!isSuccess) {
+ try {
+ Files.deleteIfExists(Paths.get(localFile));
+ } catch (IOException ex) {
+ LOG.error("Failed trying to delete the partially downloaded {}", localFile, ex);
+ }
+ }
+ return isSuccess;
+ }
+
+ public static boolean checkFileExists(String dir, String file) {
+ return Files.exists(new File(dir, file).toPath());
+ }
+
+ public static long nimbusVersionOfBlob(String key, ClientBlobStore cb) throws AuthorizationException, KeyNotFoundException {
+ long nimbusBlobVersion = 0;
+ ReadableBlobMeta metadata = cb.getBlobMeta(key);
+ nimbusBlobVersion = metadata.get_version();
+ return nimbusBlobVersion;
+ }
+
+ public static String getFileOwner(String path) throws IOException {
+ return Files.getOwner(FileSystems.getDefault().getPath(path)).getName();
+ }
+
+ public static long localVersionOfBlob(String localFile) {
+ File f = new File(localFile + DEFAULT_BLOB_VERSION_SUFFIX);
+ long currentVersion = 0;
+ if (f.exists() && !(f.isDirectory())) {
+ BufferedReader br = null;
+ try {
+ br = new BufferedReader(new FileReader(f));
+ String line = br.readLine();
+ currentVersion = Long.parseLong(line);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ } finally {
+ try {
+ if (br != null) {
+ br.close();
+ }
+ } catch (Exception ignore) {
+ LOG.error("Exception trying to cleanup", ignore);
+ }
+ }
+ return currentVersion;
+ } else {
+ return -1;
+ }
+ }
+
+ public static String constructBlobWithVersionFileName(String fileName, long version) {
+ return fileName + "." + version;
+ }
+
+ public static String constructBlobCurrentSymlinkName(String fileName) {
+ return fileName + Utils.DEFAULT_CURRENT_BLOB_SUFFIX;
+ }
+
+ public static String constructVersionFileName(String fileName) {
+ return fileName + Utils.DEFAULT_BLOB_VERSION_SUFFIX;
+ }
+ // only works on operating systems that support posix
+ public static void restrictPermissions(String baseDir) {
+ try {
+ Set<PosixFilePermission> perms = new HashSet<PosixFilePermission>(
+ Arrays.asList(PosixFilePermission.OWNER_READ, PosixFilePermission.OWNER_WRITE,
+ PosixFilePermission.OWNER_EXECUTE, PosixFilePermission.GROUP_READ,
+ PosixFilePermission.GROUP_EXECUTE));
+ Files.setPosixFilePermissions(FileSystems.getDefault().getPath(baseDir), perms);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
}
}
+
public static IFn loadClojureFn(String namespace, String name) {
try {
clojure.lang.Compiler.eval(RT.readString("(require '" + namespace + ")"));
@@ -505,6 +634,37 @@ public class Utils {
return result;
}
+ private static TDeserializer getDes() {
+ TDeserializer des = threadDes.get();
+ if(des == null) {
+ des = new TDeserializer();
+ threadDes.set(des);
+ }
+ return des;
+ }
+
+ public static byte[] thriftSerialize(TBase t) {
+ try {
+ TSerializer ser = threadSer.get();
+ if (ser == null) {
+ ser = new TSerializer();
+ threadSer.set(ser);
+ }
+ return ser.serialize(t);
+ } catch (TException e) {
+ LOG.error("Failed to serialize to thrift: ", e);
+ throw new RuntimeException(e);
+ }
+ }
+
+ public static <T> T thriftDeserialize(Class c, byte[] b) {
+ try {
+ return Utils.thriftDeserialize(c, b, 0, b.length);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
public static Integer getInt(Object o, Integer defaultValue) {
if (null == o) {
return defaultValue;
@@ -571,6 +731,245 @@ public class Utils {
return UUID.randomUUID().getLeastSignificantBits();
}
+ /**
+ * Unpack matching files from a jar. Entries inside the jar that do
+ * not match the given pattern will be skipped.
+ *
+ * @param jarFile the .jar file to unpack
+ * @param toDir the destination directory into which to unpack the jar
+ */
+ public static void unJar(File jarFile, File toDir)
+ throws IOException {
+ JarFile jar = new JarFile(jarFile);
+ try {
+ Enumeration<JarEntry> entries = jar.entries();
+ while (entries.hasMoreElements()) {
+ final JarEntry entry = entries.nextElement();
+ if (!entry.isDirectory()) {
+ InputStream in = jar.getInputStream(entry);
+ try {
+ File file = new File(toDir, entry.getName());
+ ensureDirectory(file.getParentFile());
+ OutputStream out = new FileOutputStream(file);
+ try {
+ copyBytes(in, out, 8192);
+ } finally {
+ out.close();
+ }
+ } finally {
+ in.close();
+ }
+ }
+ }
+ } finally {
+ jar.close();
+ }
+ }
+
+ /**
+ * Copies from one stream to another.
+ *
+ * @param in InputStream to read from
+ * @param out OutputStream to write to
+ * @param buffSize the size of the buffer
+ */
+ public static void copyBytes(InputStream in, OutputStream out, int buffSize)
+ throws IOException {
+ PrintStream ps = out instanceof PrintStream ? (PrintStream)out : null;
+ byte buf[] = new byte[buffSize];
+ int bytesRead = in.read(buf);
+ while (bytesRead >= 0) {
+ out.write(buf, 0, bytesRead);
+ if ((ps != null) && ps.checkError()) {
+ throw new IOException("Unable to write to output stream.");
+ }
+ bytesRead = in.read(buf);
+ }
+ }
+
+ /**
+ * Ensure the existence of a given directory.
+ *
+ * @throws IOException if it cannot be created and does not already exist
+ */
+ private static void ensureDirectory(File dir) throws IOException {
+ if (!dir.mkdirs() && !dir.isDirectory()) {
+ throw new IOException("Mkdirs failed to create " +
+ dir.toString());
+ }
+ }
+
+ /**
+ * Given a Tar File as input it will untar the file in a the untar directory
+ * passed as the second parameter
+ * <p/>
+ * This utility will untar ".tar" files and ".tar.gz","tgz" files.
+ *
+ * @param inFile The tar file as input.
+ * @param untarDir The untar directory where to untar the tar file.
+ * @throws IOException
+ */
+ public static void unTar(File inFile, File untarDir) throws IOException {
+ if (!untarDir.mkdirs()) {
+ if (!untarDir.isDirectory()) {
+ throw new IOException("Mkdirs failed to create " + untarDir);
+ }
+ }
+
+ boolean gzipped = inFile.toString().endsWith("gz");
+ if (onWindows()) {
+ // Tar is not native to Windows. Use simple Java based implementation for
+ // tests and simple tar archives
+ unTarUsingJava(inFile, untarDir, gzipped);
+ } else {
+ // spawn tar utility to untar archive for full fledged unix behavior such
+ // as resolving symlinks in tar archives
+ unTarUsingTar(inFile, untarDir, gzipped);
+ }
+ }
+
+ private static void unTarUsingTar(File inFile, File untarDir,
+ boolean gzipped) throws IOException {
+ StringBuffer untarCommand = new StringBuffer();
+ if (gzipped) {
+ untarCommand.append(" gzip -dc '");
+ untarCommand.append(inFile.toString());
+ untarCommand.append("' | (");
+ }
+ untarCommand.append("cd '");
+ untarCommand.append(untarDir.toString());
+ untarCommand.append("' ; ");
+ untarCommand.append("tar -xf ");
+
+ if (gzipped) {
+ untarCommand.append(" -)");
+ } else {
+ untarCommand.append(inFile.toString());
+ }
+ String[] shellCmd = {"bash", "-c", untarCommand.toString()};
+ ShellUtils.ShellCommandExecutor shexec = new ShellUtils.ShellCommandExecutor(shellCmd);
+ shexec.execute();
+ int exitcode = shexec.getExitCode();
+ if (exitcode != 0) {
+ throw new IOException("Error untarring file " + inFile +
+ ". Tar process exited with exit code " + exitcode);
+ }
+ }
+
+ private static void unTarUsingJava(File inFile, File untarDir,
+ boolean gzipped) throws IOException {
+ InputStream inputStream = null;
+ TarArchiveInputStream tis = null;
+ try {
+ if (gzipped) {
+ inputStream = new BufferedInputStream(new GZIPInputStream(
+ new FileInputStream(inFile)));
+ } else {
+ inputStream = new BufferedInputStream(new FileInputStream(inFile));
+ }
+ tis = new TarArchiveInputStream(inputStream);
+ for (TarArchiveEntry entry = tis.getNextTarEntry(); entry != null; ) {
+ unpackEntries(tis, entry, untarDir);
+ entry = tis.getNextTarEntry();
+ }
+ } finally {
+ cleanup(tis, inputStream);
+ }
+ }
+
+ /**
+ * Close the Closeable objects and <b>ignore</b> any {@link IOException} or
+ * null pointers. Must only be used for cleanup in exception handlers.
+ *
+ * @param closeables the objects to close
+ */
+ private static void cleanup(java.io.Closeable... closeables) {
+ for (java.io.Closeable c : closeables) {
+ if (c != null) {
+ try {
+ c.close();
+ } catch (IOException e) {
+ LOG.debug("Exception in closing " + c, e);
+
+ }
+ }
+ }
+ }
+
+ private static void unpackEntries(TarArchiveInputStream tis,
+ TarArchiveEntry entry, File outputDir) throws IOException {
+ if (entry.isDirectory()) {
+ File subDir = new File(outputDir, entry.getName());
+ if (!subDir.mkdirs() && !subDir.isDirectory()) {
+ throw new IOException("Mkdirs failed to create tar internal dir "
+ + outputDir);
+ }
+ for (TarArchiveEntry e : entry.getDirectoryEntries()) {
+ unpackEntries(tis, e, subDir);
+ }
+ return;
+ }
+ File outputFile = new File(outputDir, entry.getName());
+ if (!outputFile.getParentFile().exists()) {
+ if (!outputFile.getParentFile().mkdirs()) {
+ throw new IOException("Mkdirs failed to create tar internal dir "
+ + outputDir);
+ }
+ }
+ int count;
+ byte data[] = new byte[2048];
+ BufferedOutputStream outputStream = new BufferedOutputStream(
+ new FileOutputStream(outputFile));
+
+ while ((count = tis.read(data)) != -1) {
+ outputStream.write(data, 0, count);
+ }
+ outputStream.flush();
+ outputStream.close();
+ }
+
+ public static boolean onWindows() {
+ if (System.getenv("OS") != null) {
+ return System.getenv("OS").equals("Windows_NT");
+ }
+ return false;
+ }
+
+ public static void unpack(File localrsrc, File dst) throws IOException {
+ String lowerDst = localrsrc.getName().toLowerCase();
+ if (lowerDst.endsWith(".jar")) {
+ unJar(localrsrc, dst);
+ } else if (lowerDst.endsWith(".zip")) {
+ unZip(localrsrc, dst);
+ } else if (lowerDst.endsWith(".tar.gz") ||
+ lowerDst.endsWith(".tgz") ||
+ lowerDst.endsWith(".tar")) {
+ unTar(localrsrc, dst);
+ } else {
+ LOG.warn("Cannot unpack " + localrsrc);
+ if (!localrsrc.renameTo(dst)) {
+ throw new IOException("Unable to rename file: [" + localrsrc
+ + "] to [" + dst + "]");
+ }
+ }
+ if (localrsrc.isFile()) {
+ localrsrc.delete();
+ }
+ }
+
+ public static boolean canUserReadBlob(ReadableBlobMeta meta, String user) {
+ SettableBlobMeta settable = meta.get_settable();
+ for (AccessControl acl : settable.get_acl()) {
+ if (acl.get_type().equals(AccessControlType.OTHER) && (acl.get_access() & BlobStoreAclHandler.READ) > 0) {
+ return true;
+ }
+ if (acl.get_name().equals(user) && (acl.get_access() & BlobStoreAclHandler.READ) > 0) {
+ return true;
+ }
+ }
+ return false;
+ }
+
public static CuratorFramework newCurator(Map conf, List<String> servers, Object port, String root) {
return newCurator(conf, servers, port, root, null);
}
@@ -702,6 +1101,38 @@ public class Utils {
return ret;
}
+ /**
+ * Takes an input dir or file and returns the disk usage on that local directory.
+ * Very basic implementation.
+ *
+ * @param dir The input dir to get the disk space of this local dir
+ * @return The total disk space of the input local directory
+ */
+ public static long getDU(File dir) {
+ long size = 0;
+ if (!dir.exists())
+ return 0;
+ if (!dir.isDirectory()) {
+ return dir.length();
+ } else {
+ File[] allFiles = dir.listFiles();
+ if(allFiles != null) {
+ for (int i = 0; i < allFiles.length; i++) {
+ boolean isSymLink;
+ try {
+ isSymLink = org.apache.commons.io.FileUtils.isSymlink(allFiles[i]);
+ } catch(IOException ioe) {
+ isSymLink = true;
+ }
+ if(!isSymLink) {
+ size += getDU(allFiles[i]);
+ }
+ }
+ }
+ return size;
+ }
+ }
+
public static String threadDump() {
final StringBuilder dump = new StringBuilder();
final java.lang.management.ThreadMXBean threadMXBean = java.lang.management.ManagementFactory.getThreadMXBean();
http://git-wip-us.apache.org/repos/asf/storm/blob/7029aee5/storm-core/src/jvm/backtype/storm/validation/ConfigValidation.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/validation/ConfigValidation.java b/storm-core/src/jvm/backtype/storm/validation/ConfigValidation.java
index 44ec967..f1be007 100644
--- a/storm-core/src/jvm/backtype/storm/validation/ConfigValidation.java
+++ b/storm-core/src/jvm/backtype/storm/validation/ConfigValidation.java
@@ -477,8 +477,16 @@ public class ConfigValidation {
}
}
- public static class PacemakerAuthTypeValidator extends Validator {
+ public static class MapOfStringToMapOfStringToObjectValidator extends Validator {
+ @Override
+ public void validateField(String name, Object o) {
+ ConfigValidationUtils.NestableFieldValidator validator = ConfigValidationUtils.mapFv(ConfigValidationUtils.fv(String.class, false),
+ ConfigValidationUtils.mapFv(String.class, Object.class,true), true);
+ validator.validateField(name, o);
+ }
+ }
+ public static class PacemakerAuthTypeValidator extends Validator {
@Override
public void validateField(String name, Object o) {
if(o == null) {
@@ -486,9 +494,9 @@ public class ConfigValidation {
}
if(o instanceof String &&
- (((String)o).equals("NONE") ||
- ((String)o).equals("DIGEST") ||
- ((String)o).equals("KERBEROS"))) {
+ (((String)o).equals("NONE") ||
+ ((String)o).equals("DIGEST") ||
+ ((String)o).equals("KERBEROS"))) {
return;
}
throw new IllegalArgumentException( "Field " + name + " must be one of \"NONE\", \"DIGEST\", or \"KERBEROS\"");
http://git-wip-us.apache.org/repos/asf/storm/blob/7029aee5/storm-core/src/jvm/backtype/storm/validation/ConfigValidationAnnotations.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/validation/ConfigValidationAnnotations.java b/storm-core/src/jvm/backtype/storm/validation/ConfigValidationAnnotations.java
index ed93370..cb742fa 100644
--- a/storm-core/src/jvm/backtype/storm/validation/ConfigValidationAnnotations.java
+++ b/storm-core/src/jvm/backtype/storm/validation/ConfigValidationAnnotations.java
@@ -51,7 +51,6 @@ public class ConfigValidationAnnotations {
/**
* Validators with fields: validatorClass and type
*/
-
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.FIELD)
public @interface isType {
@@ -82,7 +81,6 @@ public class ConfigValidationAnnotations {
/**
* Validators with fields: validatorClass
*/
-
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.FIELD)
public @interface isString {
@@ -109,7 +107,7 @@ public class ConfigValidationAnnotations {
}
/**
- * validates on object is not null
+ * Validates on object is not null
*/
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.FIELD)
@@ -118,7 +116,7 @@ public class ConfigValidationAnnotations {
}
/**
- * validates that there are no duplicates in a list
+ * Validates that there are no duplicates in a list
*/
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.FIELD)
@@ -142,7 +140,6 @@ public class ConfigValidationAnnotations {
* Validates the type of each key and value in a map
* Validator with fields: validatorClass, keyValidatorClass, valueValidatorClass
*/
-
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.FIELD)
public @interface isMapEntryType {
@@ -168,7 +165,7 @@ public class ConfigValidationAnnotations {
}
/**
- * checks if a number is positive and whether zero inclusive
+ * Checks if a number is positive and whether zero inclusive
* Validator with fields: validatorClass, includeZero
*/
@Retention(RetentionPolicy.RUNTIME)
@@ -182,7 +179,6 @@ public class ConfigValidationAnnotations {
/**
* Complex/custom type validators
*/
-
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.FIELD)
public @interface isStringOrStringList {
@@ -204,7 +200,6 @@ public class ConfigValidationAnnotations {
/**
* For custom validators
*/
-
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.FIELD)
public @interface CustomValidator {
http://git-wip-us.apache.org/repos/asf/storm/blob/7029aee5/storm-core/src/py/storm/Nimbus-remote
----------------------------------------------------------------------
diff --git a/storm-core/src/py/storm/Nimbus-remote b/storm-core/src/py/storm/Nimbus-remote
index 63e7dce..5b8e396 100644
--- a/storm-core/src/py/storm/Nimbus-remote
+++ b/storm-core/src/py/storm/Nimbus-remote
@@ -55,6 +55,20 @@ if len(sys.argv) <= 1 or sys.argv[1] == '--help':
print(' void setWorkerProfiler(string id, ProfileRequest profileRequest)')
print(' getComponentPendingProfileActions(string id, string component_id, ProfileAction action)')
print(' void uploadNewCredentials(string name, Credentials creds)')
+ print(' string beginCreateBlob(string key, SettableBlobMeta meta)')
+ print(' string beginUpdateBlob(string key)')
+ print(' void uploadBlobChunk(string session, string chunk)')
+ print(' void finishBlobUpload(string session)')
+ print(' void cancelBlobUpload(string session)')
+ print(' ReadableBlobMeta getBlobMeta(string key)')
+ print(' void setBlobMeta(string key, SettableBlobMeta meta)')
+ print(' BeginDownloadResult beginBlobDownload(string key)')
+ print(' string downloadBlobChunk(string session)')
+ print(' void deleteBlob(string key)')
+ print(' ListBlobsResult listBlobs(string session)')
+ print(' i32 getBlobReplication(string key)')
+ print(' i32 updateBlobReplication(string key, i32 replication)')
+ print(' void createStateInZookeeper(string key)')
print(' string beginFileUpload()')
print(' void uploadChunk(string location, string chunk)')
print(' void finishFileUpload(string location)')
@@ -204,6 +218,90 @@ elif cmd == 'uploadNewCredentials':
sys.exit(1)
pp.pprint(client.uploadNewCredentials(args[0],eval(args[1]),))
+elif cmd == 'beginCreateBlob':
+ if len(args) != 2:
+ print('beginCreateBlob requires 2 args')
+ sys.exit(1)
+ pp.pprint(client.beginCreateBlob(args[0],eval(args[1]),))
+
+elif cmd == 'beginUpdateBlob':
+ if len(args) != 1:
+ print('beginUpdateBlob requires 1 args')
+ sys.exit(1)
+ pp.pprint(client.beginUpdateBlob(args[0],))
+
+elif cmd == 'uploadBlobChunk':
+ if len(args) != 2:
+ print('uploadBlobChunk requires 2 args')
+ sys.exit(1)
+ pp.pprint(client.uploadBlobChunk(args[0],args[1],))
+
+elif cmd == 'finishBlobUpload':
+ if len(args) != 1:
+ print('finishBlobUpload requires 1 args')
+ sys.exit(1)
+ pp.pprint(client.finishBlobUpload(args[0],))
+
+elif cmd == 'cancelBlobUpload':
+ if len(args) != 1:
+ print('cancelBlobUpload requires 1 args')
+ sys.exit(1)
+ pp.pprint(client.cancelBlobUpload(args[0],))
+
+elif cmd == 'getBlobMeta':
+ if len(args) != 1:
+ print('getBlobMeta requires 1 args')
+ sys.exit(1)
+ pp.pprint(client.getBlobMeta(args[0],))
+
+elif cmd == 'setBlobMeta':
+ if len(args) != 2:
+ print('setBlobMeta requires 2 args')
+ sys.exit(1)
+ pp.pprint(client.setBlobMeta(args[0],eval(args[1]),))
+
+elif cmd == 'beginBlobDownload':
+ if len(args) != 1:
+ print('beginBlobDownload requires 1 args')
+ sys.exit(1)
+ pp.pprint(client.beginBlobDownload(args[0],))
+
+elif cmd == 'downloadBlobChunk':
+ if len(args) != 1:
+ print('downloadBlobChunk requires 1 args')
+ sys.exit(1)
+ pp.pprint(client.downloadBlobChunk(args[0],))
+
+elif cmd == 'deleteBlob':
+ if len(args) != 1:
+ print('deleteBlob requires 1 args')
+ sys.exit(1)
+ pp.pprint(client.deleteBlob(args[0],))
+
+elif cmd == 'listBlobs':
+ if len(args) != 1:
+ print('listBlobs requires 1 args')
+ sys.exit(1)
+ pp.pprint(client.listBlobs(args[0],))
+
+elif cmd == 'getBlobReplication':
+ if len(args) != 1:
+ print('getBlobReplication requires 1 args')
+ sys.exit(1)
+ pp.pprint(client.getBlobReplication(args[0],))
+
+elif cmd == 'updateBlobReplication':
+ if len(args) != 2:
+ print('updateBlobReplication requires 2 args')
+ sys.exit(1)
+ pp.pprint(client.updateBlobReplication(args[0],eval(args[1]),))
+
+elif cmd == 'createStateInZookeeper':
+ if len(args) != 1:
+ print('createStateInZookeeper requires 1 args')
+ sys.exit(1)
+ pp.pprint(client.createStateInZookeeper(args[0],))
+
elif cmd == 'beginFileUpload':
if len(args) != 0:
print('beginFileUpload requires 0 args')