You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2017/04/06 17:33:35 UTC

[46/52] [partial] storm git commit: STORM-2441 Break down 'storm-core' to extract client (worker) artifacts

http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/storm-client/src/jvm/org/apache/storm/StormSubmitter.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/StormSubmitter.java b/storm-client/src/jvm/org/apache/storm/StormSubmitter.java
new file mode 100644
index 0000000..f49369a
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/StormSubmitter.java
@@ -0,0 +1,603 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm;
+
+import com.google.common.collect.Sets;
+
+import java.io.File;
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.List;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.storm.blobstore.NimbusBlobStore;
+import org.apache.storm.dependency.DependencyPropertiesParser;
+import org.apache.storm.dependency.DependencyUploader;
+import org.apache.storm.hooks.SubmitterHookException;
+import org.apache.storm.scheduler.resource.ResourceUtils;
+import org.apache.storm.utils.Utils;
+import org.apache.storm.utils.ObjectReader;
+import org.apache.storm.validation.ConfigValidation;
+import org.apache.commons.lang.StringUtils;
+import org.apache.thrift.TException;
+import org.json.simple.JSONValue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.storm.security.auth.IAutoCredentials;
+import org.apache.storm.security.auth.AuthUtils;
+import org.apache.storm.generated.*;
+import org.apache.storm.utils.BufferFileInputStream;
+import org.apache.storm.utils.NimbusClient;
+
+/**
+ * Use this class to submit topologies to run on the Storm cluster. You should run your program
+ * with the "storm jar" command from the command-line, and then use this class to
+ * submit your topologies.
+ */
+public class StormSubmitter {
+    public static final Logger LOG = LoggerFactory.getLogger(StormSubmitter.class);
+
+    private static final int THRIFT_CHUNK_SIZE_BYTES = 307200;
+
+    private static ILocalCluster localNimbus = null;
+
+    private static String generateZookeeperDigestSecretPayload() {
+        return Utils.secureRandomLong() + ":" + Utils.secureRandomLong();
+    }
+
+    public static final Pattern zkDigestPattern = Pattern.compile("\\S+:\\S+");
+
+    public static boolean validateZKDigestPayload(String payload) {
+        if (payload != null) {
+            Matcher m = zkDigestPattern.matcher(payload);
+            return m.matches();
+        }
+        return false;
+    }
+
+    @SuppressWarnings("unchecked")
+    public static Map prepareZookeeperAuthentication(Map conf) {
+        Map toRet = new HashMap();
+
+        // Is the topology ZooKeeper authentication configuration unset?
+        if (! conf.containsKey(Config.STORM_ZOOKEEPER_TOPOLOGY_AUTH_PAYLOAD) ||
+                conf.get(Config.STORM_ZOOKEEPER_TOPOLOGY_AUTH_PAYLOAD) == null ||
+                !  validateZKDigestPayload((String)
+                    conf.get(Config.STORM_ZOOKEEPER_TOPOLOGY_AUTH_PAYLOAD))) {
+
+            String secretPayload = generateZookeeperDigestSecretPayload();
+            toRet.put(Config.STORM_ZOOKEEPER_TOPOLOGY_AUTH_PAYLOAD, secretPayload);
+            LOG.info("Generated ZooKeeper secret payload for MD5-digest: " + secretPayload);
+        }
+
+        // This should always be set to digest.
+        toRet.put(Config.STORM_ZOOKEEPER_TOPOLOGY_AUTH_SCHEME, "digest");
+
+        return toRet;
+    }
+
+    private static Map<String,String> populateCredentials(Map conf, Map<String, String> creds) {
+        Map<String,String> ret = new HashMap<>();
+        for (IAutoCredentials autoCred: AuthUtils.GetAutoCredentials(conf)) {
+            LOG.info("Running "+autoCred);
+            autoCred.populateCredentials(ret);
+        }
+        if (creds != null) {
+            ret.putAll(creds);
+        }
+        return ret;
+    }
+
+    /**
+     * Push a new set of credentials to the running topology.
+     * @param name the name of the topology to push credentials to.
+     * @param stormConf the topology-specific configuration, if desired. See {@link Config}.
+     * @param credentials the credentials to push.
+     * @throws AuthorizationException if you are not authorized ot push credentials.
+     * @throws NotAliveException if the topology is not alive
+     * @throws InvalidTopologyException if any other error happens
+     */
+    public static void pushCredentials(String name, Map stormConf, Map<String, String> credentials)
+            throws AuthorizationException, NotAliveException, InvalidTopologyException {
+        stormConf = new HashMap(stormConf);
+        stormConf.putAll(Utils.readCommandLineOpts());
+        Map conf = Utils.readStormConfig();
+        conf.putAll(stormConf);
+        Map<String,String> fullCreds = populateCredentials(conf, credentials);
+        if (fullCreds.isEmpty()) {
+            LOG.warn("No credentials were found to push to " + name);
+            return;
+        }
+        try {
+            if (localNimbus!=null) {
+                LOG.info("Pushing Credentials to topology {} in local mode", name);
+                localNimbus.uploadNewCredentials(name, new Credentials(fullCreds));
+            } else {
+                try(NimbusClient client = NimbusClient.getConfiguredClient(conf)) {
+                    LOG.info("Uploading new credentials to {}", name);
+                    client.getClient().uploadNewCredentials(name, new Credentials(fullCreds));
+                }
+            }
+            LOG.info("Finished pushing creds to topology: {}", name);
+        } catch(TException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+
+    /**
+     * Submits a topology to run on the cluster. A topology runs forever or until
+     * explicitly killed.
+     *
+     *
+     * @param name the name of the storm.
+     * @param stormConf the topology-specific configuration. See {@link Config}.
+     * @param topology the processing to execute.
+     * @throws AlreadyAliveException if a topology with this name is already running
+     * @throws InvalidTopologyException if an invalid topology was submitted
+     * @throws AuthorizationException if authorization is failed
+     * @thorws SubmitterHookException if any Exception occurs during initialization or invocation of registered {@link ISubmitterHook}
+     */
+    public static void submitTopology(String name, Map stormConf, StormTopology topology)
+            throws AlreadyAliveException, InvalidTopologyException, AuthorizationException {
+        submitTopology(name, stormConf, topology, null, null);
+    }
+
+    /**
+     * Submits a topology to run on the cluster. A topology runs forever or until
+     * explicitly killed.
+     *
+     * @param name the name of the storm.
+     * @param stormConf the topology-specific configuration. See {@link Config}.
+     * @param topology the processing to execute.
+     * @param opts to manipulate the starting of the topology.
+     * @throws AlreadyAliveException if a topology with this name is already running
+     * @throws InvalidTopologyException if an invalid topology was submitted
+     * @throws AuthorizationException if authorization is failed
+     * @thorws SubmitterHookException if any Exception occurs during initialization or invocation of registered {@link ISubmitterHook}
+     */
+    public static void submitTopology(String name, Map stormConf, StormTopology topology, SubmitOptions opts)
+            throws AlreadyAliveException, InvalidTopologyException, AuthorizationException {
+        submitTopology(name, stormConf, topology, opts, null);
+    }
+
+    /**
+     * Submits a topology to run on the cluster as a particular user. A topology runs forever or until
+     * explicitly killed.
+     *
+     * @param name
+     * @param stormConf
+     * @param topology
+     * @param opts
+     * @param progressListener
+     * @param asUser The user as which this topology should be submitted.
+     * @throws AlreadyAliveException
+     * @throws InvalidTopologyException
+     * @throws AuthorizationException
+     * @throws IllegalArgumentException thrown if configs will yield an unschedulable topology. validateConfs validates confs
+     * @thorws SubmitterHookException if any Exception occurs during initialization or invocation of registered {@link ISubmitterHook}
+     */
+    public static void submitTopologyAs(String name, Map stormConf, StormTopology topology, SubmitOptions opts, ProgressListener progressListener, String asUser)
+            throws AlreadyAliveException, InvalidTopologyException, AuthorizationException, IllegalArgumentException {
+        if(!Utils.isValidConf(stormConf)) {
+            throw new IllegalArgumentException("Storm conf is not valid. Must be json-serializable");
+        }
+        stormConf = new HashMap(stormConf);
+        stormConf.putAll(Utils.readCommandLineOpts());
+        Map conf = Utils.readStormConfig();
+        conf.putAll(stormConf);
+        stormConf.putAll(prepareZookeeperAuthentication(conf));
+
+        validateConfs(conf, topology);
+
+        Map<String,String> passedCreds = new HashMap<>();
+        if (opts != null) {
+            Credentials tmpCreds = opts.get_creds();
+            if (tmpCreds != null) {
+                passedCreds = tmpCreds.get_creds();
+            }
+        }
+        Map<String,String> fullCreds = populateCredentials(conf, passedCreds);
+        if (!fullCreds.isEmpty()) {
+            if (opts == null) {
+                opts = new SubmitOptions(TopologyInitialStatus.ACTIVE);
+            }
+            opts.set_creds(new Credentials(fullCreds));
+        }
+        try {
+            if (localNimbus!=null) {
+                LOG.info("Submitting topology " + name + " in local mode");
+                if (opts!=null) {
+                    localNimbus.submitTopologyWithOpts(name, stormConf, topology, opts);
+                } else {
+                    // this is for backwards compatibility
+                    localNimbus.submitTopology(name, stormConf, topology);
+                }
+                LOG.info("Finished submitting topology: " +  name);
+            } else {
+                String serConf = JSONValue.toJSONString(stormConf);
+                try (NimbusClient client = NimbusClient.getConfiguredClientAs(conf, asUser)) {
+                    if (topologyNameExists(name, client)) {
+                        throw new RuntimeException("Topology with name `" + name + "` already exists on cluster");
+                    }
+
+                    // Dependency uploading only makes sense for distributed mode
+                    List<String> jarsBlobKeys = Collections.emptyList();
+                    List<String> artifactsBlobKeys;
+
+                    DependencyUploader uploader = new DependencyUploader();
+                    try {
+                        uploader.init();
+
+                        jarsBlobKeys = uploadDependencyJarsToBlobStore(uploader);
+
+                        artifactsBlobKeys = uploadDependencyArtifactsToBlobStore(uploader);
+                    } catch (Throwable e) {
+                        // remove uploaded jars blobs, not artifacts since they're shared across the cluster
+                        uploader.deleteBlobs(jarsBlobKeys);
+                        uploader.shutdown();
+                        throw e;
+                    }
+
+                    try {
+                        setDependencyBlobsToTopology(topology, jarsBlobKeys, artifactsBlobKeys);
+                        submitTopologyInDistributeMode(name, topology, opts, progressListener, asUser, conf, serConf, client);
+                    } catch (AlreadyAliveException | InvalidTopologyException | AuthorizationException e) {
+                        // remove uploaded jars blobs, not artifacts since they're shared across the cluster
+                        // Note that we don't handle TException to delete jars blobs
+                        // because it's safer to leave some blobs instead of topology not running
+                        uploader.deleteBlobs(jarsBlobKeys);
+                        throw e;
+                    } finally {
+                        uploader.shutdown();
+                    }
+                }
+            }
+        } catch(TException e) {
+            throw new RuntimeException(e);
+        }
+        invokeSubmitterHook(name, asUser, conf, topology);
+
+    }
+
+    private static List<String> uploadDependencyJarsToBlobStore(DependencyUploader uploader) {
+        LOG.info("Uploading dependencies - jars...");
+
+        DependencyPropertiesParser propertiesParser = new DependencyPropertiesParser();
+
+        String depJarsProp = System.getProperty("storm.dependency.jars", "");
+        List<File> depJars = propertiesParser.parseJarsProperties(depJarsProp);
+
+        try {
+            return uploader.uploadFiles(depJars, true);
+        } catch (Throwable e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    private static List<String> uploadDependencyArtifactsToBlobStore(DependencyUploader uploader) {
+        LOG.info("Uploading dependencies - artifacts...");
+
+        DependencyPropertiesParser propertiesParser = new DependencyPropertiesParser();
+
+        String depArtifactsProp = System.getProperty("storm.dependency.artifacts", "{}");
+        Map<String, File> depArtifacts = propertiesParser.parseArtifactsProperties(depArtifactsProp);
+
+        try {
+            return uploader.uploadArtifacts(depArtifacts);
+        } catch (Throwable e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    private static void setDependencyBlobsToTopology(StormTopology topology, List<String> jarsBlobKeys, List<String> artifactsBlobKeys) {
+        LOG.info("Dependency Blob keys - jars : {} / artifacts : {}", jarsBlobKeys, artifactsBlobKeys);
+        topology.set_dependency_jars(jarsBlobKeys);
+        topology.set_dependency_artifacts(artifactsBlobKeys);
+    }
+
+    private static void submitTopologyInDistributeMode(String name, StormTopology topology, SubmitOptions opts,
+                                                       ProgressListener progressListener, String asUser, Map conf,
+                                                       String serConf, NimbusClient client) throws TException {
+        try {
+            String jar = submitJarAs(conf, System.getProperty("storm.jar"), progressListener, client);
+            LOG.info("Submitting topology {} in distributed mode with conf {}", name, serConf);
+
+            if (opts != null) {
+                client.getClient().submitTopologyWithOpts(name, jar, serConf, topology, opts);
+            } else {
+                // this is for backwards compatibility
+                client.getClient().submitTopology(name, jar, serConf, topology);
+            }
+            LOG.info("Finished submitting topology: {}", name);
+        } catch (InvalidTopologyException e) {
+            LOG.warn("Topology submission exception: {}", e.get_msg());
+            throw e;
+        } catch (AlreadyAliveException e) {
+            LOG.warn("Topology already alive exception", e);
+            throw e;
+        }
+    }
+
+    /**
+     *
+     * @param name
+     * @param asUser
+     * @param stormConf
+     * @param topology
+     *
+     * @thorws SubmitterHookException This is thrown when any Exception occurs during initialization or invocation of registered {@link ISubmitterHook}
+     */
+    private static void invokeSubmitterHook(String name, String asUser, Map stormConf, StormTopology topology) {
+        String submissionNotifierClassName = null;
+        try {
+            if (stormConf.containsKey(Config.STORM_TOPOLOGY_SUBMISSION_NOTIFIER_PLUGIN)) {
+                submissionNotifierClassName = stormConf.get(Config.STORM_TOPOLOGY_SUBMISSION_NOTIFIER_PLUGIN).toString();
+                LOG.info("Initializing the registered ISubmitterHook [{}]", submissionNotifierClassName);
+
+                if(submissionNotifierClassName == null || submissionNotifierClassName.isEmpty()) {
+                    throw new IllegalArgumentException(Config.STORM_TOPOLOGY_SUBMISSION_NOTIFIER_PLUGIN + " property must be a non empty string.");
+                }
+
+                ISubmitterHook submitterHook = (ISubmitterHook) Class.forName(submissionNotifierClassName).newInstance();
+                TopologyInfo topologyInfo = Utils.getTopologyInfo(name, asUser, stormConf);
+                LOG.info("Invoking the registered ISubmitterHook [{}]", submissionNotifierClassName);
+                submitterHook.notify(topologyInfo, stormConf, topology);
+            }
+        } catch (Exception e) {
+            LOG.warn("Error occurred in invoking submitter hook:[{}] ",submissionNotifierClassName, e);
+            throw new SubmitterHookException(e);
+        }
+    }
+
+    /**
+     * Submits a topology to run on the cluster. A topology runs forever or until
+     * explicitly killed.
+     *
+     *
+     * @param name the name of the storm.
+     * @param stormConf the topology-specific configuration. See {@link Config}.
+     * @param topology the processing to execute.
+     * @param opts to manipulate the starting of the topology
+     * @param progressListener to track the progress of the jar upload process
+     * @throws AlreadyAliveException if a topology with this name is already running
+     * @throws InvalidTopologyException if an invalid topology was submitted
+     * @throws AuthorizationException if authorization is failed
+     * @thorws SubmitterHookException if any Exception occurs during initialization or invocation of registered {@link ISubmitterHook}
+     */
+    @SuppressWarnings("unchecked")
+    public static void submitTopology(String name, Map stormConf, StormTopology topology, SubmitOptions opts,
+             ProgressListener progressListener) throws AlreadyAliveException, InvalidTopologyException, AuthorizationException {
+        submitTopologyAs(name, stormConf, topology, opts, progressListener, null);
+    }
+
+    /**
+     * Submits a topology to run on the cluster with a progress bar. A topology runs forever or until
+     * explicitly killed.
+     *
+     *
+     * @param name the name of the storm.
+     * @param stormConf the topology-specific configuration. See {@link Config}.
+     * @param topology the processing to execute.
+     * @throws AlreadyAliveException if a topology with this name is already running
+     * @throws InvalidTopologyException if an invalid topology was submitted
+     * @throws AuthorizationException if authorization is failed
+     */
+
+    public static void submitTopologyWithProgressBar(String name, Map stormConf, StormTopology topology) throws AlreadyAliveException, InvalidTopologyException, AuthorizationException {
+        submitTopologyWithProgressBar(name, stormConf, topology, null);
+    }
+
+    /**
+     * Submits a topology to run on the cluster with a progress bar. A topology runs forever or until
+     * explicitly killed.
+     *
+     *
+     * @param name the name of the storm.
+     * @param stormConf the topology-specific configuration. See {@link Config}.
+     * @param topology the processing to execute.
+     * @param opts to manipulate the starting of the topology
+     * @throws AlreadyAliveException if a topology with this name is already running
+     * @throws InvalidTopologyException if an invalid topology was submitted
+     * @throws AuthorizationException if authorization is failed
+     * @thorws SubmitterHookException if any Exception occurs during initialization or invocation of registered {@link ISubmitterHook}
+     */
+    public static void submitTopologyWithProgressBar(String name, Map stormConf, StormTopology topology, SubmitOptions opts) throws AlreadyAliveException, InvalidTopologyException, AuthorizationException {
+        // show a progress bar so we know we're not stuck (especially on slow connections)
+        submitTopology(name, stormConf, topology, opts, new StormSubmitter.ProgressListener() {
+            @Override
+            public void onStart(String srcFile, String targetFile, long totalBytes) {
+                System.out.printf("Start uploading file '%s' to '%s' (%d bytes)\n", srcFile, targetFile, totalBytes);
+            }
+
+            @Override
+            public void onProgress(String srcFile, String targetFile, long bytesUploaded, long totalBytes) {
+                int length = 50;
+                int p = (int)((length * bytesUploaded) / totalBytes);
+                String progress = StringUtils.repeat("=", p);
+                String todo = StringUtils.repeat(" ", length - p);
+
+                System.out.printf("\r[%s%s] %d / %d", progress, todo, bytesUploaded, totalBytes);
+            }
+
+            @Override
+            public void onCompleted(String srcFile, String targetFile, long totalBytes) {
+                System.out.printf("\nFile '%s' uploaded to '%s' (%d bytes)\n", srcFile, targetFile, totalBytes);
+            }
+        });
+    }
+
+    private static boolean topologyNameExists(String name, NimbusClient client) {
+        try {
+            return !client.getClient().isTopologyNameAllowed(name);
+        } catch(Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    private static String submitJar(Map conf, ProgressListener listener) {
+        return  submitJar(conf, System.getProperty("storm.jar"), listener);
+    }
+
+    /**
+     * Submit jar file
+     * @param conf the topology-specific configuration. See {@link Config}.
+     * @param localJar file path of the jar file to submit
+     * @return the remote location of the submitted jar
+     */
+    public static String submitJar(Map conf, String localJar) {
+        return submitJar(conf, localJar, null);
+    }
+
+    public static String submitJarAs(Map conf, String localJar, ProgressListener listener, NimbusClient client) {
+        if (localJar == null) {
+            throw new RuntimeException("Must submit topologies using the 'storm' client script so that StormSubmitter knows which jar to upload.");
+        }
+
+        try {
+            String uploadLocation = client.getClient().beginFileUpload();
+            LOG.info("Uploading topology jar " + localJar + " to assigned location: " + uploadLocation);
+            BufferFileInputStream is = new BufferFileInputStream(localJar, THRIFT_CHUNK_SIZE_BYTES);
+
+            long totalSize = new File(localJar).length();
+            if (listener != null) {
+                listener.onStart(localJar, uploadLocation, totalSize);
+            }
+
+            long bytesUploaded = 0;
+            while(true) {
+                byte[] toSubmit = is.read();
+                bytesUploaded += toSubmit.length;
+                if (listener != null) {
+                    listener.onProgress(localJar, uploadLocation, bytesUploaded, totalSize);
+                }
+
+                if(toSubmit.length==0) break;
+                client.getClient().uploadChunk(uploadLocation, ByteBuffer.wrap(toSubmit));
+            }
+            client.getClient().finishFileUpload(uploadLocation);
+
+            if (listener != null) {
+                listener.onCompleted(localJar, uploadLocation, totalSize);
+            }
+
+            LOG.info("Successfully uploaded topology jar to assigned location: " + uploadLocation);
+            return uploadLocation;
+        } catch(Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+    
+    public static String submitJarAs(Map conf, String localJar, ProgressListener listener, String asUser) {
+        if (localJar == null) {
+            throw new RuntimeException("Must submit topologies using the 'storm' client script so that StormSubmitter knows which jar to upload.");
+        }
+
+        try (NimbusClient client = NimbusClient.getConfiguredClientAs(conf, asUser)) {
+            return submitJarAs(conf, localJar, listener, client);
+        }
+    }
+
+    /**
+     * Submit jar file
+     * @param conf the topology-specific configuration. See {@link Config}.
+     * @param localJar file path of the jar file to submit
+     * @param listener progress listener to track the jar file upload
+     * @return the remote location of the submitted jar
+     */
+    public static String submitJar(Map conf, String localJar, ProgressListener listener) {
+        return submitJarAs(conf,localJar, listener, (String)null);
+    }
+
+    /**
+     * Interface use to track progress of file upload
+     */
+    public interface ProgressListener {
+        /**
+         * called before file is uploaded
+         * @param srcFile - jar file to be uploaded
+         * @param targetFile - destination file
+         * @param totalBytes - total number of bytes of the file
+         */
+        public void onStart(String srcFile, String targetFile, long totalBytes);
+
+        /**
+         * called whenever a chunk of bytes is uploaded
+         * @param srcFile - jar file to be uploaded
+         * @param targetFile - destination file
+         * @param bytesUploaded - number of bytes transferred so far
+         * @param totalBytes - total number of bytes of the file
+         */
+        public void onProgress(String srcFile, String targetFile, long bytesUploaded, long totalBytes);
+
+        /**
+         * called when the file is uploaded
+         * @param srcFile - jar file to be uploaded
+         * @param targetFile - destination file
+         * @param totalBytes - total number of bytes of the file
+         */
+        public void onCompleted(String srcFile, String targetFile, long totalBytes);
+    }
+
+    private static void validateConfs(Map<String, Object> stormConf, StormTopology topology) throws IllegalArgumentException, InvalidTopologyException {
+        ConfigValidation.validateFields(stormConf);
+        validateTopologyWorkerMaxHeapSizeMBConfigs(stormConf, topology);
+        Utils.validateTopologyBlobStoreMap(stormConf, getListOfKeysFromBlobStore(stormConf));
+    }
+
+    private static void validateTopologyWorkerMaxHeapSizeMBConfigs(Map<String, Object> stormConf, StormTopology topology) {
+        double largestMemReq = getMaxExecutorMemoryUsageForTopo(topology, stormConf);
+        Double topologyWorkerMaxHeapSize = ObjectReader.getDouble(stormConf.get(Config.TOPOLOGY_WORKER_MAX_HEAP_SIZE_MB));
+        if(topologyWorkerMaxHeapSize < largestMemReq) {
+            throw new IllegalArgumentException("Topology will not be able to be successfully scheduled: Config TOPOLOGY_WORKER_MAX_HEAP_SIZE_MB="
+                    + ObjectReader.getDouble(stormConf.get(Config.TOPOLOGY_WORKER_MAX_HEAP_SIZE_MB)) + " < "
+                    + largestMemReq + " (Largest memory requirement of a component in the topology). Perhaps set TOPOLOGY_WORKER_MAX_HEAP_SIZE_MB to a larger amount");
+        }
+    }
+
+    private static double getMaxExecutorMemoryUsageForTopo(StormTopology topology, Map topologyConf) {
+        double largestMemoryOperator = 0.0;
+        for(Map<String, Double> entry : ResourceUtils.getBoltsResources(topology, topologyConf).values()) {
+            double memoryRequirement = entry.get(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB)
+                    + entry.get(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB);
+            if(memoryRequirement > largestMemoryOperator) {
+                largestMemoryOperator = memoryRequirement;
+            }
+        }
+        for(Map<String, Double> entry : ResourceUtils.getSpoutsResources(topology, topologyConf).values()) {
+            double memoryRequirement = entry.get(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB)
+                    + entry.get(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB);
+            if(memoryRequirement > largestMemoryOperator) {
+                largestMemoryOperator = memoryRequirement;
+            }
+        }
+        return largestMemoryOperator;
+    }
+
+    private static Set<String> getListOfKeysFromBlobStore(Map<String, Object> stormConf) {
+        try (NimbusBlobStore client = new NimbusBlobStore()) {
+            client.prepare(stormConf);
+            return Sets.newHashSet(client.listKeys());
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/storm-client/src/jvm/org/apache/storm/StormTimer.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/StormTimer.java b/storm-client/src/jvm/org/apache/storm/StormTimer.java
new file mode 100644
index 0000000..4f6a7d5
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/StormTimer.java
@@ -0,0 +1,242 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm;
+
+import org.apache.storm.utils.Utils;
+import org.apache.storm.utils.Time;
+
+import java.nio.channels.ClosedByInterruptException;
+import java.util.Comparator;
+import java.util.Random;
+import java.util.concurrent.PriorityBlockingQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * The timer defined in this file is very similar to java.util.Timer, except
+ * it integrates with Storm's time simulation capabilities. This lets us test
+ * code that does asynchronous work on the timer thread
+ */
+
+public class StormTimer implements AutoCloseable {
+
+    public static class QueueEntry {
+        public final Long endTimeMs;
+        public final Runnable func;
+        public final String id;
+
+        public QueueEntry(Long endTimeMs, Runnable func, String id) {
+            this.endTimeMs = endTimeMs;
+            this.func = func;
+            this.id = id;
+        }
+    }
+
+    public static class StormTimerTask extends Thread {
+
+        //initialCapacity set to 11 since its the default inital capacity of PriorityBlockingQueue
+        private PriorityBlockingQueue<QueueEntry> queue = new PriorityBlockingQueue<QueueEntry>(11, new Comparator<QueueEntry>() {
+            @Override
+            public int compare(QueueEntry o1, QueueEntry o2) {
+                return o1.endTimeMs.intValue() - o2.endTimeMs.intValue();
+            }
+        });
+
+        // boolean to indicate whether timer is active
+        private AtomicBoolean active = new AtomicBoolean(false);
+
+        // function to call when timer is killed
+        private Thread.UncaughtExceptionHandler onKill;
+
+        //random number generator
+        private Random random = new Random();
+
+        @Override
+        public void run() {
+            while (this.active.get()) {
+                QueueEntry queueEntry = null;
+                try {
+                    queueEntry = this.queue.peek();
+                    if ((queueEntry != null) && (Time.currentTimeMillis() >= queueEntry.endTimeMs)) {
+                        // It is imperative to not run the function
+                        // inside the timer lock. Otherwise, it is
+                        // possible to deadlock if the fn deals with
+                        // other locks, like the submit lock.
+                        this.queue.remove(queueEntry);
+                        queueEntry.func.run();
+                    } else if (queueEntry != null) {
+                        //  If any events are scheduled, sleep until
+                        // event generation. If any recurring events
+                        // are scheduled then we will always go
+                        // through this branch, sleeping only the
+                        // exact necessary amount of time. We give
+                        // an upper bound, e.g. 1000 millis, to the
+                        // sleeping time, to limit the response time
+                        // for detecting any new event within 1 secs.
+                        Time.sleep(Math.min(1000, (queueEntry.endTimeMs - Time.currentTimeMillis())));
+                    } else {
+                        // Otherwise poll to see if any new event
+                        // was scheduled. This is, in essence, the
+                        // response time for detecting any new event
+                        // schedulings when there are no scheduled
+                        // events.
+                        Time.sleep(1000);
+                    }
+                } catch (Throwable e) {
+                    if (!(Utils.exceptionCauseIsInstanceOf(InterruptedException.class, e))
+                            && !(Utils.exceptionCauseIsInstanceOf(ClosedByInterruptException.class, e))) {
+                        this.onKill.uncaughtException(this, e);
+                        this.setActive(false);
+                    }
+                }
+            }
+        }
+
+        public void setOnKillFunc(Thread.UncaughtExceptionHandler onKill) {
+            this.onKill = onKill;
+        }
+
+        public void setActive(boolean flag) {
+            this.active.set(flag);
+        }
+
+        public boolean isActive() {
+            return this.active.get();
+        }
+
+        public void add(QueueEntry queueEntry) {
+            this.queue.add(queueEntry);
+        }
+    }
+
+    //task to run
+    private StormTimerTask task = new StormTimerTask();
+
+    /**
+     * Makes a Timer in the form of a StormTimerTask Object
+     * @param name name of the timer
+     * @param onKill function to call when timer is killed unexpectedly
+     * @return StormTimerTask object that was initialized
+     */
+    public StormTimer (String name, Thread.UncaughtExceptionHandler onKill) {
+        if (onKill == null) {
+            throw new RuntimeException("onKill func is null!");
+        }
+        if (name == null) {
+            this.task.setName("timer");
+        } else {
+            this.task.setName(name);
+        }
+        this.task.setOnKillFunc(onKill);
+        this.task.setActive(true);
+
+        this.task.setDaemon(true);
+        this.task.setPriority(Thread.MAX_PRIORITY);
+        this.task.start();
+    }
+
+    /**
+     * Schedule a function to be executed in the timer
+     * @param delaySecs the number of seconds to delay before running the function
+     * @param func the function to run
+     * @param checkActive whether to check is the timer is active
+     * @param jitterMs add jitter to the run
+     */
+    public void schedule(int delaySecs, Runnable func, boolean checkActive, int jitterMs) {
+        if (func == null) {
+            throw new RuntimeException("function to schedule is null!");
+        }
+        if (checkActive) {
+            checkActive();
+        }
+        String id = Utils.uuid();
+        long endTimeMs = Time.currentTimeMillis() + Time.secsToMillisLong(delaySecs);
+        if (jitterMs > 0) {
+            endTimeMs = this.task.random.nextInt(jitterMs) + endTimeMs;
+        }
+        task.add(new QueueEntry(endTimeMs, func, id));
+    }
+
+    public void schedule(int delaySecs, Runnable func) {
+        schedule(delaySecs, func, true, 0);
+    }
+
+    /**
+     * Schedule a function to run recurrently
+     * @param delaySecs the number of seconds to delay before running the function
+     * @param recurSecs the time between each invocation
+     * @param func the function to run
+     */
+    public void scheduleRecurring(int delaySecs, final int recurSecs, final Runnable func) {
+        schedule(delaySecs, new Runnable() {
+            @Override
+            public void run() {
+                func.run();
+                // This avoids a race condition with cancel-timer.
+                schedule(recurSecs, this, false, 0);
+            }
+        });
+    }
+
+    /**
+     * schedule a function to run recurrently with jitter
+     * @param delaySecs the number of seconds to delay before running the function
+     * @param recurSecs the time between each invocation
+     * @param jitterMs jitter added to the run
+     * @param func the function to run
+     */
+    public void scheduleRecurringWithJitter(int delaySecs, final int recurSecs, final int jitterMs, final Runnable func) {
+        schedule(delaySecs, new Runnable() {
+            @Override
+            public void run() {
+                func.run();
+                // This avoids a race condition with cancel-timer.
+                schedule(recurSecs, this, false, jitterMs);
+            }
+        });
+    }
+
+    /**
+     * check if timer is active
+     */
+    private void checkActive() {
+        if (!this.task.isActive()) {
+            throw new IllegalStateException("Timer is not active");
+        }
+    }
+
+    /**
+     * cancel timer
+     */
+
+    @Override
+    public void close() throws Exception {
+        if (this.task.isActive()) {
+            this.task.setActive(false);
+            this.task.interrupt();
+            this.task.join();
+        }
+    }
+
+    /**
+     * is timer waiting. Used in timer simulation
+     */
+    public boolean isTimerWaiting() {
+        return Time.isThreadWaiting(task);
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/storm-client/src/jvm/org/apache/storm/Thrift.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/Thrift.java b/storm-client/src/jvm/org/apache/storm/Thrift.java
new file mode 100644
index 0000000..2e5080a
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/Thrift.java
@@ -0,0 +1,350 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm;
+
+import org.apache.storm.generated.Bolt;
+import org.apache.storm.generated.JavaObjectArg;
+import org.apache.storm.generated.SpoutSpec;
+import org.apache.storm.generated.StateSpoutSpec;
+import org.apache.storm.generated.StreamInfo;
+
+import java.lang.reflect.Constructor;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.HashMap;
+import java.io.Serializable;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+
+import org.apache.storm.generated.JavaObject;
+import org.apache.storm.generated.Grouping;
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.generated.StormTopology._Fields;
+import org.apache.storm.generated.ComponentCommon;
+import org.apache.storm.generated.NullStruct;
+import org.apache.storm.generated.GlobalStreamId;
+import org.apache.storm.generated.ComponentObject;
+
+import org.apache.storm.task.IBolt;
+import org.apache.storm.topology.BoltDeclarer;
+import org.apache.storm.topology.IRichBolt;
+import org.apache.storm.topology.IBasicBolt;
+import org.apache.storm.topology.IRichSpout;
+import org.apache.storm.topology.SpoutDeclarer;
+import org.apache.storm.utils.Utils;
+import org.json.simple.JSONValue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.storm.topology.TopologyBuilder;
+
+public class Thrift {
+    private static Logger LOG = LoggerFactory.getLogger(Thrift.class);
+
+    private static StormTopology._Fields[] STORM_TOPOLOGY_FIELDS = null;
+    private static StormTopology._Fields[] SPOUT_FIELDS =
+            { StormTopology._Fields.SPOUTS, StormTopology._Fields.STATE_SPOUTS };
+
+    static {
+        Set<_Fields> keys = StormTopology.metaDataMap.keySet();
+        keys.toArray(STORM_TOPOLOGY_FIELDS = new StormTopology._Fields[keys.size()]);
+    }
+
+    public static StormTopology._Fields[] getTopologyFields() {
+        return STORM_TOPOLOGY_FIELDS;
+    }
+
+    public static StormTopology._Fields[] getSpoutFields() {
+        return SPOUT_FIELDS;
+    }
+
+    public static class SpoutDetails {
+        private IRichSpout spout;
+        private Integer parallelism;
+        private Map conf;
+
+        public SpoutDetails(IRichSpout spout, Integer parallelism, Map conf) {
+            this.spout = spout;
+            this.parallelism = parallelism;
+            this.conf = conf;
+        }
+
+        public IRichSpout getSpout() {
+            return spout;
+        }
+
+        public Integer getParallelism() {
+            return parallelism;
+        }
+
+        public Map getConf() {
+            return conf;
+        }
+    }
+
+    public static class BoltDetails {
+        private Object bolt;
+        private Map conf;
+        private Integer parallelism;
+        private Map<GlobalStreamId, Grouping> inputs;
+
+        public BoltDetails(Object bolt, Map conf, Integer parallelism,
+                           Map<GlobalStreamId, Grouping> inputs) {
+            this.bolt = bolt;
+            this.conf = conf;
+            this.parallelism = parallelism;
+            this.inputs = inputs;
+        }
+
+        public Object getBolt() {
+            return bolt;
+        }
+
+        public Map getConf() {
+            return conf;
+        }
+
+        public Map<GlobalStreamId, Grouping> getInputs() {
+            return inputs;
+        }
+
+        public Integer getParallelism() {
+            return parallelism;
+        }
+    }
+
+    public static StreamInfo directOutputFields(List<String> fields) {
+        return new StreamInfo(fields, true);
+    }
+
+    public static StreamInfo outputFields(List<String> fields) {
+        return new StreamInfo(fields, false);
+    }
+
+    public static Grouping prepareShuffleGrouping() {
+        return Grouping.shuffle(new NullStruct());
+    }
+
+    public static Grouping prepareLocalOrShuffleGrouping() {
+        return Grouping.local_or_shuffle(new NullStruct());
+    }
+
+    public static Grouping prepareFieldsGrouping(List<String> fields) {
+        return Grouping.fields(fields);
+    }
+
+    public static Grouping prepareGlobalGrouping() {
+        return prepareFieldsGrouping(new ArrayList<String>());
+    }
+
+    public static Grouping prepareDirectGrouping() {
+        return Grouping.direct(new NullStruct());
+    }
+
+    public static Grouping prepareAllGrouping() {
+        return Grouping.all(new NullStruct());
+    }
+
+    public static Grouping prepareNoneGrouping() {
+        return Grouping.none(new NullStruct());
+    }
+
+    public static Grouping prepareCustomStreamGrouping(Object obj) {
+        return Grouping.custom_serialized(Utils.javaSerialize(obj));
+    }
+
+    public static Grouping prepareCustomJavaObjectGrouping(JavaObject obj) {
+        return Grouping.custom_object(obj);
+    }
+
+    public static Object instantiateJavaObject(JavaObject obj) {
+
+        List<JavaObjectArg> args = obj.get_args_list();
+        Class[] paraTypes = new Class[args.size()];
+        Object[] paraValues = new Object[args.size()];
+        for (int i = 0; i < args.size(); i++) {
+            JavaObjectArg arg = args.get(i);
+            paraValues[i] = arg.getFieldValue();
+
+            if (arg.getSetField().equals(JavaObjectArg._Fields.INT_ARG)) {
+                paraTypes[i] = Integer.class;
+            } else if (arg.getSetField().equals(JavaObjectArg._Fields.LONG_ARG)) {
+                paraTypes[i] = Long.class;
+            } else if (arg.getSetField().equals(JavaObjectArg._Fields.STRING_ARG)) {
+                paraTypes[i] = String.class;
+            } else if (arg.getSetField().equals(JavaObjectArg._Fields.BOOL_ARG)) {
+                paraTypes[i] = Boolean.class;
+            } else if (arg.getSetField().equals(JavaObjectArg._Fields.BINARY_ARG)) {
+                paraTypes[i] = ByteBuffer.class;
+            } else if (arg.getSetField().equals(JavaObjectArg._Fields.DOUBLE_ARG)) {
+                paraTypes[i] = Double.class;
+            } else {
+                paraTypes[i] = Object.class;
+            }
+        }
+
+        try {
+            Class clazz = Class.forName(obj.get_full_class_name());
+            Constructor cons = clazz.getConstructor(paraTypes);
+            return cons.newInstance(paraValues);
+        } catch (Exception e) {
+            LOG.error("java object instantiation failed", e);
+        }
+
+        return null;
+
+    }
+
+    public static Grouping._Fields groupingType(Grouping grouping) {
+        return grouping.getSetField();
+    }
+
+    public static List<String> fieldGrouping(Grouping grouping) {
+        if (!Grouping._Fields.FIELDS.equals(groupingType(grouping))) {
+            throw new IllegalArgumentException("Tried to get grouping fields from non fields grouping");
+        }
+        return grouping.get_fields();
+    }
+
+    public static boolean isGlobalGrouping(Grouping grouping) {
+        if (Grouping._Fields.FIELDS.equals(groupingType(grouping))) {
+            return fieldGrouping(grouping).isEmpty();
+        }
+
+        return false;
+    }
+
+    public static int getParallelismHint(ComponentCommon componentCommon) {
+        if (!componentCommon.is_set_parallelism_hint()) {
+            return 1;
+        } else {
+            return componentCommon.get_parallelism_hint();
+        }
+    }
+
+    public static ComponentObject serializeComponentObject(Object obj) {
+        return ComponentObject.serialized_java(Utils.javaSerialize(obj));
+    }
+
+    public static Object deserializeComponentObject(ComponentObject obj) {
+        if (obj.getSetField() != ComponentObject._Fields.SERIALIZED_JAVA) {
+            throw new RuntimeException("Cannot deserialize non-java-serialized object");
+        }
+        return Utils.javaDeserialize(obj.get_serialized_java(), Serializable.class);
+    }
+
+    public static ComponentCommon prepareComponentCommon(Map<GlobalStreamId, Grouping> inputs, Map<String,
+            StreamInfo> outputs, Integer parallelismHint) {
+        return prepareComponentCommon(inputs, outputs, parallelismHint, null);
+    }
+
+    public static ComponentCommon prepareComponentCommon(Map<GlobalStreamId, Grouping> inputs, Map<String, StreamInfo> outputs,
+                                                         Integer parallelismHint, Map conf) {
+        Map<GlobalStreamId, Grouping> mappedInputs = new HashMap<>();
+        Map<String, StreamInfo> mappedOutputs = new HashMap<>();
+        if (inputs != null && !inputs.isEmpty()) {
+            mappedInputs.putAll(inputs);
+        }
+        if (outputs !=null && !outputs.isEmpty()) {
+            mappedOutputs.putAll(outputs);
+        }
+        ComponentCommon component = new ComponentCommon(mappedInputs, mappedOutputs);
+        if (parallelismHint != null) {
+            component.set_parallelism_hint(parallelismHint);
+        }
+        if (conf != null) {
+            component.set_json_conf(JSONValue.toJSONString(conf));
+        }
+        return component;
+    }
+
+    public static SpoutSpec prepareSerializedSpoutDetails(IRichSpout spout, Map<String, StreamInfo> outputs) {
+        return new SpoutSpec(ComponentObject.serialized_java
+                (Utils.javaSerialize(spout)), prepareComponentCommon(new HashMap<>(), outputs, null, null));
+    }
+
+    public static Bolt prepareSerializedBoltDetails(Map<GlobalStreamId, Grouping> inputs, IBolt bolt, Map<String, StreamInfo> outputs,
+                                                    Integer parallelismHint, Map conf) {
+        ComponentCommon common = prepareComponentCommon(inputs, outputs, parallelismHint, conf);
+        return new Bolt(ComponentObject.serialized_java(Utils.javaSerialize(bolt)), common);
+    }
+
+    public static BoltDetails prepareBoltDetails(Map<GlobalStreamId, Grouping> inputs, Object bolt) {
+        return prepareBoltDetails(inputs, bolt, null, null);
+    }
+
+    public static BoltDetails prepareBoltDetails(Map<GlobalStreamId, Grouping> inputs, Object bolt,
+                                                 Integer parallelismHint) {
+        return prepareBoltDetails(inputs, bolt, parallelismHint, null);
+    }
+
+    public static BoltDetails prepareBoltDetails(Map<GlobalStreamId, Grouping> inputs, Object bolt,
+                                                 Integer parallelismHint, Map conf) {
+        BoltDetails details = new BoltDetails(bolt, conf, parallelismHint, inputs);
+        return details;
+    }
+
+    public static SpoutDetails prepareSpoutDetails(IRichSpout spout) {
+        return prepareSpoutDetails(spout, null, null);
+    }
+
+    public static SpoutDetails prepareSpoutDetails(IRichSpout spout, Integer parallelismHint) {
+        return prepareSpoutDetails(spout, parallelismHint, null);
+    }
+
+    public static SpoutDetails prepareSpoutDetails(IRichSpout spout, Integer parallelismHint, Map conf) {
+        SpoutDetails details = new SpoutDetails(spout, parallelismHint, conf);
+        return details;
+    }
+
+    public static StormTopology buildTopology(HashMap<String, SpoutDetails> spoutMap,
+                                              HashMap<String, BoltDetails> boltMap, HashMap<String, StateSpoutSpec> stateMap) {
+        return buildTopology(spoutMap, boltMap);
+    }
+
+    private static void addInputs(BoltDeclarer declarer, Map<GlobalStreamId, Grouping> inputs) {
+        for(Entry<GlobalStreamId, Grouping> entry : inputs.entrySet()) {
+            declarer.grouping(entry.getKey(), entry.getValue());
+        }
+    }
+
+    public static StormTopology buildTopology(Map<String, SpoutDetails> spoutMap, Map<String, BoltDetails> boltMap) {
+        TopologyBuilder builder = new TopologyBuilder();
+        for (Entry<String, SpoutDetails> entry : spoutMap.entrySet()) {
+            String spoutID = entry.getKey();
+            SpoutDetails spec = entry.getValue();
+            SpoutDeclarer spoutDeclarer = builder.setSpout(spoutID, spec.getSpout(), spec.getParallelism());
+            spoutDeclarer.addConfigurations(spec.getConf());
+        }
+        for (Entry<String, BoltDetails> entry : boltMap.entrySet()) {
+            String spoutID = entry.getKey();
+            BoltDetails spec = entry.getValue();
+            BoltDeclarer boltDeclarer = null;
+            if (spec.bolt instanceof IRichBolt) {
+                boltDeclarer = builder.setBolt(spoutID, (IRichBolt)spec.getBolt(), spec.getParallelism());
+            } else {
+                boltDeclarer = builder.setBolt(spoutID, (IBasicBolt)spec.getBolt(), spec.getParallelism());
+            }
+            boltDeclarer.addConfigurations(spec.getConf());
+            addInputs(boltDeclarer, spec.getInputs());
+        }
+        return builder.createTopology();
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/storm-client/src/jvm/org/apache/storm/annotation/InterfaceStability.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/annotation/InterfaceStability.java b/storm-client/src/jvm/org/apache/storm/annotation/InterfaceStability.java
new file mode 100644
index 0000000..d05ae75
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/annotation/InterfaceStability.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.annotation;
+
+import java.lang.annotation.Documented;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+
+/**
+ * Annotation to inform users of how much to rely on a particular package,
+ * class or method not changing over time.
+ * </ul>
+ */
+@InterfaceStability.Evolving
+public class InterfaceStability {
+    /**
+     * Can evolve while retaining compatibility for minor release boundaries.;
+     * can break compatibility only at major release (ie. at m.0).
+     */
+    @Documented
+    @Retention(RetentionPolicy.RUNTIME)
+    public @interface Stable {};
+
+    /**
+     * Evolving, but can break compatibility at minor release (i.e. m.x)
+     */
+    @Documented
+    @Retention(RetentionPolicy.RUNTIME)
+    public @interface Evolving {};
+
+    /**
+     * No guarantee is provided as to reliability or stability across any
+     * level of release granularity.
+     */
+    @Documented
+    @Retention(RetentionPolicy.RUNTIME)
+    public @interface Unstable {};
+}
+

http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/storm-client/src/jvm/org/apache/storm/blobstore/AtomicOutputStream.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/blobstore/AtomicOutputStream.java b/storm-client/src/jvm/org/apache/storm/blobstore/AtomicOutputStream.java
new file mode 100644
index 0000000..892e25c
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/blobstore/AtomicOutputStream.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.blobstore;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+/**
+ * An output stream where all of the data is committed on close,
+ * or can be canceled with cancel.
+ */
+public abstract class AtomicOutputStream extends OutputStream {
+    /**
+     * Cancel all of the writes associated with this stream and close it.
+     */ 
+    public abstract void cancel() throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/storm-client/src/jvm/org/apache/storm/blobstore/BlobStore.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/blobstore/BlobStore.java b/storm-client/src/jvm/org/apache/storm/blobstore/BlobStore.java
new file mode 100644
index 0000000..1c10c40
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/blobstore/BlobStore.java
@@ -0,0 +1,490 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.blobstore;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.Set;
+import java.util.regex.Pattern;
+
+import javax.security.auth.Subject;
+
+import org.apache.storm.nimbus.NimbusInfo;
+import org.apache.storm.utils.ConfigUtils;
+import org.apache.storm.utils.Utils;
+import org.apache.commons.lang.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.storm.daemon.Shutdownable;
+import org.apache.storm.generated.AuthorizationException;
+import org.apache.storm.generated.KeyNotFoundException;
+import org.apache.storm.generated.KeyAlreadyExistsException;
+import org.apache.storm.generated.ReadableBlobMeta;
+import org.apache.storm.generated.SettableBlobMeta;
+import org.apache.storm.generated.StormTopology;
+
+/**
+ * Provides a way to store blobs that can be downloaded.
+ * Blobs must be able to be uploaded and listed from Nimbus,
+ * and downloaded from the Supervisors. It is a key value based
+ * store. Key being a string and value being the blob data.
+ *
+ * ACL checking must take place against the provided subject.
+ * If the blob store does not support Security it must validate
+ * that all ACLs set are always WORLD, everything.
+ *
+ * The users can upload their blobs through the blob store command
+ * line. The command line also allows us to update and delete blobs.
+ *
+ * Modifying the replication factor only works for HdfsBlobStore
+ * as for the LocalFsBlobStore the replication is dependent on
+ * the number of Nimbodes available.
+ */
+public abstract class BlobStore implements Shutdownable {
+    private static final Logger LOG = LoggerFactory.getLogger(BlobStore.class);
+    private static final Pattern KEY_PATTERN = Pattern.compile("^[\\w \\t\\.:_-]+$");
+    protected static final String BASE_BLOBS_DIR_NAME = "blobs";
+
+    /**
+     * Allows us to initialize the blob store
+     * @param conf The storm configuration
+     * @param baseDir The directory path to store the blobs
+     * @param nimbusInfo Contains the nimbus host, port and leadership information.
+     */
+    public abstract void prepare(Map conf, String baseDir, NimbusInfo nimbusInfo);
+
+    /**
+     * Creates the blob.
+     * @param key Key for the blob.
+     * @param meta Metadata which contains the acls information
+     * @param who Is the subject creating the blob.
+     * @return AtomicOutputStream returns a stream into which the data
+     * can be written.
+     * @throws AuthorizationException
+     * @throws KeyAlreadyExistsException
+     */
+    public abstract AtomicOutputStream createBlob(String key, SettableBlobMeta meta, Subject who) throws AuthorizationException, KeyAlreadyExistsException;
+
+    /**
+     * Updates the blob data.
+     * @param key Key for the blob.
+     * @param who Is the subject having the write privilege for the blob.
+     * @return AtomicOutputStream returns a stream into which the data
+     * can be written.
+     * @throws AuthorizationException
+     * @throws KeyNotFoundException
+     */
+    public abstract AtomicOutputStream updateBlob(String key, Subject who) throws AuthorizationException, KeyNotFoundException;
+
+    /**
+     * Gets the current version of metadata for a blob
+     * to be viewed by the user or downloaded by the supervisor.
+     * @param key Key for the blob.
+     * @param who Is the subject having the read privilege for the blob.
+     * @return AtomicOutputStream returns a stream into which the data
+     * can be written.
+     * @throws AuthorizationException
+     * @throws KeyNotFoundException
+     */
+    public abstract ReadableBlobMeta getBlobMeta(String key, Subject who) throws AuthorizationException, KeyNotFoundException;
+
+    /**
+     * Sets the metadata with renewed acls for the blob.
+     * @param key Key for the blob.
+     * @param meta Metadata which contains the updated
+     * acls information.
+     * @param who Is the subject having the write privilege for the blob.
+     * @throws AuthorizationException
+     * @throws KeyNotFoundException
+     */
+    public abstract void setBlobMeta(String key, SettableBlobMeta meta, Subject who) throws AuthorizationException, KeyNotFoundException;
+
+    /**
+     * Deletes the blob data and metadata.
+     * @param key Key for the blob.
+     * @param who Is the subject having write privilege for the blob.
+     * @throws AuthorizationException
+     * @throws KeyNotFoundException
+     */
+    public abstract void deleteBlob(String key, Subject who) throws AuthorizationException, KeyNotFoundException;
+
+    /**
+     * Gets the InputStream to read the blob details
+     * @param key Key for the blob.
+     * @param who Is the subject having the read privilege for the blob.
+     * @return InputStreamWithMeta has the additional
+     * file length and version information.
+     * @throws AuthorizationException
+     * @throws KeyNotFoundException
+     */
+    public abstract InputStreamWithMeta getBlob(String key, Subject who) throws AuthorizationException, KeyNotFoundException;
+
+    /**
+     * Returns an iterator with all the list of
+     * keys currently available on the blob store.
+     * @return Iterator<String>
+     */
+    public abstract Iterator<String> listKeys();
+
+    /**
+     * Gets the replication factor of the blob.
+     * @param key Key for the blob.
+     * @param who Is the subject having the read privilege for the blob.
+     * @return BlobReplication object containing the
+     * replication factor for the blob.
+     * @throws Exception
+     */
+    public abstract int getBlobReplication(String key, Subject who) throws Exception;
+
+    /**
+     * Modifies the replication factor of the blob.
+     * @param key Key for the blob.
+     * @param replication The replication factor the
+     * blob has to be set.
+     * @param who Is the subject having the update privilege for the blob
+     * @return BlobReplication object containing the
+     * updated replication factor for the blob.
+     * @throws AuthorizationException
+     * @throws KeyNotFoundException
+     * @throws IOException
+     */
+    public abstract int updateBlobReplication(String key, int replication, Subject who) throws AuthorizationException, KeyNotFoundException, IOException;
+
+    /**
+     * Filters keys based on the KeyFilter
+     * passed as the argument.
+     * @param filter KeyFilter
+     * @param <R> Type
+     * @return Set of filtered keys
+     */
+    public <R> Set<R> filterAndListKeys(KeyFilter<R> filter) {
+        Set<R> ret = new HashSet<R>();
+        Iterator<String> keys = listKeys();
+        while (keys.hasNext()) {
+            String key = keys.next();
+            R filtered = filter.filter(key);
+            if (filtered != null) {
+                ret.add(filtered);
+            }
+        }
+        return ret;
+    }
+
+    /**
+     * Validates key checking for potentially harmful patterns
+     * @param key Key for the blob.
+     */
+    public static final void validateKey(String key) throws AuthorizationException {
+        if (StringUtils.isEmpty(key) || "..".equals(key) || ".".equals(key) || !KEY_PATTERN.matcher(key).matches()) {
+            LOG.error("'{}' does not appear to be valid {}", key, KEY_PATTERN);
+            throw new AuthorizationException(key+" does not appear to be a valid blob key");
+        }
+    }
+
+    /**
+     * Wrapper called to create the blob which contains
+     * the byte data
+     * @param key Key for the blob.
+     * @param data Byte data that needs to be uploaded.
+     * @param meta Metadata which contains the acls information
+     * @param who Is the subject creating the blob.
+     * @throws AuthorizationException
+     * @throws KeyAlreadyExistsException
+     * @throws IOException
+     */
+    public void createBlob(String key, byte [] data, SettableBlobMeta meta, Subject who) throws AuthorizationException, KeyAlreadyExistsException, IOException {
+        AtomicOutputStream out = null;
+        try {
+            out = createBlob(key, meta, who);
+            out.write(data);
+            out.close();
+            out = null;
+        } finally {
+            if (out != null) {
+                out.cancel();
+            }
+        }
+    }
+
+    /**
+     * Wrapper called to create the blob which contains
+     * the byte data
+     * @param key Key for the blob.
+     * @param in InputStream from which the data is read to be
+     * written as a part of the blob.
+     * @param meta Metadata which contains the acls information
+     * @param who Is the subject creating the blob.
+     * @throws AuthorizationException
+     * @throws KeyAlreadyExistsException
+     * @throws IOException
+     */
+    public void createBlob(String key, InputStream in, SettableBlobMeta meta, Subject who) throws AuthorizationException, KeyAlreadyExistsException, IOException {
+        AtomicOutputStream out = null;
+        try {
+            out = createBlob(key, meta, who);
+            byte[] buffer = new byte[2048];
+            int len = 0;
+            while ((len = in.read(buffer)) > 0) {
+                out.write(buffer, 0, len);
+            }
+            out.close();
+        } catch (AuthorizationException | IOException | RuntimeException e) {
+            if (out !=null) {
+                out.cancel();
+            }
+        } finally {
+            in.close();
+        }
+    }
+
+    /**
+     * Reads the blob from the blob store
+     * and writes it into the output stream.
+     * @param key Key for the blob.
+     * @param out Output stream
+     * @param who Is the subject having read
+     * privilege for the blob.
+     * @throws IOException
+     * @throws KeyNotFoundException
+     * @throws AuthorizationException
+     */
+    public void readBlobTo(String key, OutputStream out, Subject who) throws IOException, KeyNotFoundException, AuthorizationException {
+        InputStreamWithMeta in = getBlob(key, who);
+        if (in == null) {
+            throw new IOException("Could not find " + key);
+        }
+        byte[] buffer = new byte[2048];
+        int len = 0;
+        try{
+            while ((len = in.read(buffer)) > 0) {
+                out.write(buffer, 0, len);
+            }
+        } finally {
+            in.close();
+            out.flush();
+        }
+    }
+
+    /**
+     * Wrapper around readBlobTo which
+     * returns a ByteArray output stream.
+     * @param key  Key for the blob.
+     * @param who Is the subject having
+     * the read privilege for the blob.
+     * @return ByteArrayOutputStream
+     * @throws IOException
+     * @throws KeyNotFoundException
+     * @throws AuthorizationException
+     */
+    public byte[] readBlob(String key, Subject who) throws IOException, KeyNotFoundException, AuthorizationException {
+        ByteArrayOutputStream out = new ByteArrayOutputStream();
+        readBlobTo(key, out, who);
+        byte[] bytes = out.toByteArray();
+        out.close();
+        return bytes;
+    }
+
+    /**
+     * Helper method to read a stored topology
+     * @param topoId the id of the topology to read
+     * @param who who to read it as
+     * @return the deserialized topology.
+     * @throws IOException on any error while reading the blob.
+     * @throws AuthorizationException if who is not allowed to read the blob
+     * @throws KeyNotFoundException if the blob could not be found
+     */
+    public StormTopology readTopology(String topoId, Subject who) throws KeyNotFoundException, AuthorizationException, IOException {
+        return Utils.deserialize(readBlob(ConfigUtils.masterStormCodeKey(topoId), who), StormTopology.class);
+    }
+    
+    /**
+     * Helper method to read a stored topology config
+     * @param topoId the id of the topology whose conf we are reading
+     * @param who who we are reading this as
+     * @return the deserialized config
+     * @throws KeyNotFoundException if the blob could not be found
+     * @throws AuthorizationException if who is not allowed to read the blob
+     * @throws IOException on any error while reading the blob.
+     */
+    public Map<String, Object> readTopologyConf(String topoId, Subject who) throws KeyNotFoundException, AuthorizationException, IOException {
+        return Utils.fromCompressedJsonConf(readBlob(ConfigUtils.masterStormConfKey(topoId), who));
+    }
+    
+    private static final KeyFilter<String> TO_TOPO_ID = (key) -> ConfigUtils.getIdFromBlobKey(key);
+    /**
+     * @return a set of all of the topology ids with special data stored in the
+     * blob store.
+     */
+    public Set<String> storedTopoIds() {
+        return filterAndListKeys(TO_TOPO_ID);
+    }
+    
+    /**
+     * Output stream implementation used for reading the
+     * metadata and data information.
+     */
+    protected class BlobStoreFileOutputStream extends AtomicOutputStream {
+        private BlobStoreFile part;
+        private OutputStream out;
+
+        public BlobStoreFileOutputStream(BlobStoreFile part) throws IOException {
+            this.part = part;
+            this.out = part.getOutputStream();
+        }
+
+        @Override
+        public void close() throws IOException {
+            try {
+                //close means commit
+                out.close();
+                part.commit();
+            } catch (IOException | RuntimeException e) {
+                cancel();
+                throw e;
+            }
+        }
+
+        @Override
+        public void cancel() throws IOException {
+            try {
+                out.close();
+            } finally {
+                part.cancel();
+            }
+        }
+
+        @Override
+        public void write(int b) throws IOException {
+            out.write(b);
+        }
+
+        @Override
+        public void write(byte []b) throws IOException {
+            out.write(b);
+        }
+
+        @Override
+        public void write(byte []b, int offset, int len) throws IOException {
+            out.write(b, offset, len);
+        }
+    }
+
+    /**
+     * Input stream implementation used for writing
+     * both the metadata containing the acl information
+     * and the blob data.
+     */
+    protected class BlobStoreFileInputStream extends InputStreamWithMeta {
+        private BlobStoreFile part;
+        private InputStream in;
+
+        public BlobStoreFileInputStream(BlobStoreFile part) throws IOException {
+            this.part = part;
+            this.in = part.getInputStream();
+        }
+
+        @Override
+        public long getVersion() throws IOException {
+            return part.getModTime();
+        }
+
+        @Override
+        public int read() throws IOException {
+            return in.read();
+        }
+
+        @Override
+        public int read(byte[] b, int off, int len) throws IOException {
+            return in.read(b, off, len);
+        }
+
+        @Override
+        public int read(byte[] b) throws IOException {
+            return in.read(b);
+        }
+
+        @Override
+        public int available() throws IOException {
+            return in.available();
+        }
+
+        @Override
+        public long getFileLength() throws IOException {
+            return part.getFileLength();
+        }
+
+        @Override
+        public void close() throws IOException {
+            in.close();
+        }
+    }
+
+    /**
+     * Blob store implements its own version of iterator
+     * to list the blobs
+     */
+    public static class KeyTranslationIterator implements Iterator<String> {
+        private Iterator<String> it = null;
+        private String next = null;
+        private String prefix = null;
+
+        public KeyTranslationIterator(Iterator<String> it, String prefix) throws IOException {
+            this.it = it;
+            this.prefix = prefix;
+            primeNext();
+        }
+
+        private void primeNext() {
+            next = null;
+            while (it.hasNext()) {
+                String tmp = it.next();
+                if (tmp.startsWith(prefix)) {
+                    next = tmp.substring(prefix.length());
+                    return;
+                }
+            }
+        }
+
+        @Override
+        public boolean hasNext() {
+            return next != null;
+        }
+
+        @Override
+        public String next() {
+            if (!hasNext()) {
+                throw new NoSuchElementException();
+            }
+            String current = next;
+            primeNext();
+            return current;
+        }
+
+        @Override
+        public void remove() {
+            throw new UnsupportedOperationException("Delete Not Supported");
+        }
+    }
+}