You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2017/04/06 17:33:35 UTC
[46/52] [partial] storm git commit: STORM-2441 Break down
'storm-core' to extract client (worker) artifacts
http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/storm-client/src/jvm/org/apache/storm/StormSubmitter.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/StormSubmitter.java b/storm-client/src/jvm/org/apache/storm/StormSubmitter.java
new file mode 100644
index 0000000..f49369a
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/StormSubmitter.java
@@ -0,0 +1,603 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm;
+
+import com.google.common.collect.Sets;
+
+import java.io.File;
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.List;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.storm.blobstore.NimbusBlobStore;
+import org.apache.storm.dependency.DependencyPropertiesParser;
+import org.apache.storm.dependency.DependencyUploader;
+import org.apache.storm.hooks.SubmitterHookException;
+import org.apache.storm.scheduler.resource.ResourceUtils;
+import org.apache.storm.utils.Utils;
+import org.apache.storm.utils.ObjectReader;
+import org.apache.storm.validation.ConfigValidation;
+import org.apache.commons.lang.StringUtils;
+import org.apache.thrift.TException;
+import org.json.simple.JSONValue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.storm.security.auth.IAutoCredentials;
+import org.apache.storm.security.auth.AuthUtils;
+import org.apache.storm.generated.*;
+import org.apache.storm.utils.BufferFileInputStream;
+import org.apache.storm.utils.NimbusClient;
+
+/**
+ * Use this class to submit topologies to run on the Storm cluster. You should run your program
+ * with the "storm jar" command from the command-line, and then use this class to
+ * submit your topologies.
+ */
+public class StormSubmitter {
+ public static final Logger LOG = LoggerFactory.getLogger(StormSubmitter.class);
+
+ private static final int THRIFT_CHUNK_SIZE_BYTES = 307200;
+
+ private static ILocalCluster localNimbus = null;
+
+ private static String generateZookeeperDigestSecretPayload() {
+ return Utils.secureRandomLong() + ":" + Utils.secureRandomLong();
+ }
+
+ public static final Pattern zkDigestPattern = Pattern.compile("\\S+:\\S+");
+
+ public static boolean validateZKDigestPayload(String payload) {
+ if (payload != null) {
+ Matcher m = zkDigestPattern.matcher(payload);
+ return m.matches();
+ }
+ return false;
+ }
+
+ @SuppressWarnings("unchecked")
+ public static Map prepareZookeeperAuthentication(Map conf) {
+ Map toRet = new HashMap();
+
+ // Is the topology ZooKeeper authentication configuration unset?
+ if (! conf.containsKey(Config.STORM_ZOOKEEPER_TOPOLOGY_AUTH_PAYLOAD) ||
+ conf.get(Config.STORM_ZOOKEEPER_TOPOLOGY_AUTH_PAYLOAD) == null ||
+ ! validateZKDigestPayload((String)
+ conf.get(Config.STORM_ZOOKEEPER_TOPOLOGY_AUTH_PAYLOAD))) {
+
+ String secretPayload = generateZookeeperDigestSecretPayload();
+ toRet.put(Config.STORM_ZOOKEEPER_TOPOLOGY_AUTH_PAYLOAD, secretPayload);
+ LOG.info("Generated ZooKeeper secret payload for MD5-digest: " + secretPayload);
+ }
+
+ // This should always be set to digest.
+ toRet.put(Config.STORM_ZOOKEEPER_TOPOLOGY_AUTH_SCHEME, "digest");
+
+ return toRet;
+ }
+
+ private static Map<String,String> populateCredentials(Map conf, Map<String, String> creds) {
+ Map<String,String> ret = new HashMap<>();
+ for (IAutoCredentials autoCred: AuthUtils.GetAutoCredentials(conf)) {
+ LOG.info("Running "+autoCred);
+ autoCred.populateCredentials(ret);
+ }
+ if (creds != null) {
+ ret.putAll(creds);
+ }
+ return ret;
+ }
+
+ /**
+ * Push a new set of credentials to the running topology.
+ * @param name the name of the topology to push credentials to.
+ * @param stormConf the topology-specific configuration, if desired. See {@link Config}.
+ * @param credentials the credentials to push.
+ * @throws AuthorizationException if you are not authorized ot push credentials.
+ * @throws NotAliveException if the topology is not alive
+ * @throws InvalidTopologyException if any other error happens
+ */
+ public static void pushCredentials(String name, Map stormConf, Map<String, String> credentials)
+ throws AuthorizationException, NotAliveException, InvalidTopologyException {
+ stormConf = new HashMap(stormConf);
+ stormConf.putAll(Utils.readCommandLineOpts());
+ Map conf = Utils.readStormConfig();
+ conf.putAll(stormConf);
+ Map<String,String> fullCreds = populateCredentials(conf, credentials);
+ if (fullCreds.isEmpty()) {
+ LOG.warn("No credentials were found to push to " + name);
+ return;
+ }
+ try {
+ if (localNimbus!=null) {
+ LOG.info("Pushing Credentials to topology {} in local mode", name);
+ localNimbus.uploadNewCredentials(name, new Credentials(fullCreds));
+ } else {
+ try(NimbusClient client = NimbusClient.getConfiguredClient(conf)) {
+ LOG.info("Uploading new credentials to {}", name);
+ client.getClient().uploadNewCredentials(name, new Credentials(fullCreds));
+ }
+ }
+ LOG.info("Finished pushing creds to topology: {}", name);
+ } catch(TException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+
+ /**
+ * Submits a topology to run on the cluster. A topology runs forever or until
+ * explicitly killed.
+ *
+ *
+ * @param name the name of the storm.
+ * @param stormConf the topology-specific configuration. See {@link Config}.
+ * @param topology the processing to execute.
+ * @throws AlreadyAliveException if a topology with this name is already running
+ * @throws InvalidTopologyException if an invalid topology was submitted
+ * @throws AuthorizationException if authorization is failed
+ * @thorws SubmitterHookException if any Exception occurs during initialization or invocation of registered {@link ISubmitterHook}
+ */
+ public static void submitTopology(String name, Map stormConf, StormTopology topology)
+ throws AlreadyAliveException, InvalidTopologyException, AuthorizationException {
+ submitTopology(name, stormConf, topology, null, null);
+ }
+
+ /**
+ * Submits a topology to run on the cluster. A topology runs forever or until
+ * explicitly killed.
+ *
+ * @param name the name of the storm.
+ * @param stormConf the topology-specific configuration. See {@link Config}.
+ * @param topology the processing to execute.
+ * @param opts to manipulate the starting of the topology.
+ * @throws AlreadyAliveException if a topology with this name is already running
+ * @throws InvalidTopologyException if an invalid topology was submitted
+ * @throws AuthorizationException if authorization is failed
+ * @thorws SubmitterHookException if any Exception occurs during initialization or invocation of registered {@link ISubmitterHook}
+ */
+ public static void submitTopology(String name, Map stormConf, StormTopology topology, SubmitOptions opts)
+ throws AlreadyAliveException, InvalidTopologyException, AuthorizationException {
+ submitTopology(name, stormConf, topology, opts, null);
+ }
+
+ /**
+ * Submits a topology to run on the cluster as a particular user. A topology runs forever or until
+ * explicitly killed.
+ *
+ * @param name
+ * @param stormConf
+ * @param topology
+ * @param opts
+ * @param progressListener
+ * @param asUser The user as which this topology should be submitted.
+ * @throws AlreadyAliveException
+ * @throws InvalidTopologyException
+ * @throws AuthorizationException
+ * @throws IllegalArgumentException thrown if configs will yield an unschedulable topology. validateConfs validates confs
+ * @thorws SubmitterHookException if any Exception occurs during initialization or invocation of registered {@link ISubmitterHook}
+ */
+ public static void submitTopologyAs(String name, Map stormConf, StormTopology topology, SubmitOptions opts, ProgressListener progressListener, String asUser)
+ throws AlreadyAliveException, InvalidTopologyException, AuthorizationException, IllegalArgumentException {
+ if(!Utils.isValidConf(stormConf)) {
+ throw new IllegalArgumentException("Storm conf is not valid. Must be json-serializable");
+ }
+ stormConf = new HashMap(stormConf);
+ stormConf.putAll(Utils.readCommandLineOpts());
+ Map conf = Utils.readStormConfig();
+ conf.putAll(stormConf);
+ stormConf.putAll(prepareZookeeperAuthentication(conf));
+
+ validateConfs(conf, topology);
+
+ Map<String,String> passedCreds = new HashMap<>();
+ if (opts != null) {
+ Credentials tmpCreds = opts.get_creds();
+ if (tmpCreds != null) {
+ passedCreds = tmpCreds.get_creds();
+ }
+ }
+ Map<String,String> fullCreds = populateCredentials(conf, passedCreds);
+ if (!fullCreds.isEmpty()) {
+ if (opts == null) {
+ opts = new SubmitOptions(TopologyInitialStatus.ACTIVE);
+ }
+ opts.set_creds(new Credentials(fullCreds));
+ }
+ try {
+ if (localNimbus!=null) {
+ LOG.info("Submitting topology " + name + " in local mode");
+ if (opts!=null) {
+ localNimbus.submitTopologyWithOpts(name, stormConf, topology, opts);
+ } else {
+ // this is for backwards compatibility
+ localNimbus.submitTopology(name, stormConf, topology);
+ }
+ LOG.info("Finished submitting topology: " + name);
+ } else {
+ String serConf = JSONValue.toJSONString(stormConf);
+ try (NimbusClient client = NimbusClient.getConfiguredClientAs(conf, asUser)) {
+ if (topologyNameExists(name, client)) {
+ throw new RuntimeException("Topology with name `" + name + "` already exists on cluster");
+ }
+
+ // Dependency uploading only makes sense for distributed mode
+ List<String> jarsBlobKeys = Collections.emptyList();
+ List<String> artifactsBlobKeys;
+
+ DependencyUploader uploader = new DependencyUploader();
+ try {
+ uploader.init();
+
+ jarsBlobKeys = uploadDependencyJarsToBlobStore(uploader);
+
+ artifactsBlobKeys = uploadDependencyArtifactsToBlobStore(uploader);
+ } catch (Throwable e) {
+ // remove uploaded jars blobs, not artifacts since they're shared across the cluster
+ uploader.deleteBlobs(jarsBlobKeys);
+ uploader.shutdown();
+ throw e;
+ }
+
+ try {
+ setDependencyBlobsToTopology(topology, jarsBlobKeys, artifactsBlobKeys);
+ submitTopologyInDistributeMode(name, topology, opts, progressListener, asUser, conf, serConf, client);
+ } catch (AlreadyAliveException | InvalidTopologyException | AuthorizationException e) {
+ // remove uploaded jars blobs, not artifacts since they're shared across the cluster
+ // Note that we don't handle TException to delete jars blobs
+ // because it's safer to leave some blobs instead of topology not running
+ uploader.deleteBlobs(jarsBlobKeys);
+ throw e;
+ } finally {
+ uploader.shutdown();
+ }
+ }
+ }
+ } catch(TException e) {
+ throw new RuntimeException(e);
+ }
+ invokeSubmitterHook(name, asUser, conf, topology);
+
+ }
+
+ private static List<String> uploadDependencyJarsToBlobStore(DependencyUploader uploader) {
+ LOG.info("Uploading dependencies - jars...");
+
+ DependencyPropertiesParser propertiesParser = new DependencyPropertiesParser();
+
+ String depJarsProp = System.getProperty("storm.dependency.jars", "");
+ List<File> depJars = propertiesParser.parseJarsProperties(depJarsProp);
+
+ try {
+ return uploader.uploadFiles(depJars, true);
+ } catch (Throwable e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private static List<String> uploadDependencyArtifactsToBlobStore(DependencyUploader uploader) {
+ LOG.info("Uploading dependencies - artifacts...");
+
+ DependencyPropertiesParser propertiesParser = new DependencyPropertiesParser();
+
+ String depArtifactsProp = System.getProperty("storm.dependency.artifacts", "{}");
+ Map<String, File> depArtifacts = propertiesParser.parseArtifactsProperties(depArtifactsProp);
+
+ try {
+ return uploader.uploadArtifacts(depArtifacts);
+ } catch (Throwable e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private static void setDependencyBlobsToTopology(StormTopology topology, List<String> jarsBlobKeys, List<String> artifactsBlobKeys) {
+ LOG.info("Dependency Blob keys - jars : {} / artifacts : {}", jarsBlobKeys, artifactsBlobKeys);
+ topology.set_dependency_jars(jarsBlobKeys);
+ topology.set_dependency_artifacts(artifactsBlobKeys);
+ }
+
+ private static void submitTopologyInDistributeMode(String name, StormTopology topology, SubmitOptions opts,
+ ProgressListener progressListener, String asUser, Map conf,
+ String serConf, NimbusClient client) throws TException {
+ try {
+ String jar = submitJarAs(conf, System.getProperty("storm.jar"), progressListener, client);
+ LOG.info("Submitting topology {} in distributed mode with conf {}", name, serConf);
+
+ if (opts != null) {
+ client.getClient().submitTopologyWithOpts(name, jar, serConf, topology, opts);
+ } else {
+ // this is for backwards compatibility
+ client.getClient().submitTopology(name, jar, serConf, topology);
+ }
+ LOG.info("Finished submitting topology: {}", name);
+ } catch (InvalidTopologyException e) {
+ LOG.warn("Topology submission exception: {}", e.get_msg());
+ throw e;
+ } catch (AlreadyAliveException e) {
+ LOG.warn("Topology already alive exception", e);
+ throw e;
+ }
+ }
+
+ /**
+ *
+ * @param name
+ * @param asUser
+ * @param stormConf
+ * @param topology
+ *
+ * @thorws SubmitterHookException This is thrown when any Exception occurs during initialization or invocation of registered {@link ISubmitterHook}
+ */
+ private static void invokeSubmitterHook(String name, String asUser, Map stormConf, StormTopology topology) {
+ String submissionNotifierClassName = null;
+ try {
+ if (stormConf.containsKey(Config.STORM_TOPOLOGY_SUBMISSION_NOTIFIER_PLUGIN)) {
+ submissionNotifierClassName = stormConf.get(Config.STORM_TOPOLOGY_SUBMISSION_NOTIFIER_PLUGIN).toString();
+ LOG.info("Initializing the registered ISubmitterHook [{}]", submissionNotifierClassName);
+
+ if(submissionNotifierClassName == null || submissionNotifierClassName.isEmpty()) {
+ throw new IllegalArgumentException(Config.STORM_TOPOLOGY_SUBMISSION_NOTIFIER_PLUGIN + " property must be a non empty string.");
+ }
+
+ ISubmitterHook submitterHook = (ISubmitterHook) Class.forName(submissionNotifierClassName).newInstance();
+ TopologyInfo topologyInfo = Utils.getTopologyInfo(name, asUser, stormConf);
+ LOG.info("Invoking the registered ISubmitterHook [{}]", submissionNotifierClassName);
+ submitterHook.notify(topologyInfo, stormConf, topology);
+ }
+ } catch (Exception e) {
+ LOG.warn("Error occurred in invoking submitter hook:[{}] ",submissionNotifierClassName, e);
+ throw new SubmitterHookException(e);
+ }
+ }
+
+ /**
+ * Submits a topology to run on the cluster. A topology runs forever or until
+ * explicitly killed.
+ *
+ *
+ * @param name the name of the storm.
+ * @param stormConf the topology-specific configuration. See {@link Config}.
+ * @param topology the processing to execute.
+ * @param opts to manipulate the starting of the topology
+ * @param progressListener to track the progress of the jar upload process
+ * @throws AlreadyAliveException if a topology with this name is already running
+ * @throws InvalidTopologyException if an invalid topology was submitted
+ * @throws AuthorizationException if authorization is failed
+ * @thorws SubmitterHookException if any Exception occurs during initialization or invocation of registered {@link ISubmitterHook}
+ */
+ @SuppressWarnings("unchecked")
+ public static void submitTopology(String name, Map stormConf, StormTopology topology, SubmitOptions opts,
+ ProgressListener progressListener) throws AlreadyAliveException, InvalidTopologyException, AuthorizationException {
+ submitTopologyAs(name, stormConf, topology, opts, progressListener, null);
+ }
+
+ /**
+ * Submits a topology to run on the cluster with a progress bar. A topology runs forever or until
+ * explicitly killed.
+ *
+ *
+ * @param name the name of the storm.
+ * @param stormConf the topology-specific configuration. See {@link Config}.
+ * @param topology the processing to execute.
+ * @throws AlreadyAliveException if a topology with this name is already running
+ * @throws InvalidTopologyException if an invalid topology was submitted
+ * @throws AuthorizationException if authorization is failed
+ */
+
+ public static void submitTopologyWithProgressBar(String name, Map stormConf, StormTopology topology) throws AlreadyAliveException, InvalidTopologyException, AuthorizationException {
+ submitTopologyWithProgressBar(name, stormConf, topology, null);
+ }
+
+ /**
+ * Submits a topology to run on the cluster with a progress bar. A topology runs forever or until
+ * explicitly killed.
+ *
+ *
+ * @param name the name of the storm.
+ * @param stormConf the topology-specific configuration. See {@link Config}.
+ * @param topology the processing to execute.
+ * @param opts to manipulate the starting of the topology
+ * @throws AlreadyAliveException if a topology with this name is already running
+ * @throws InvalidTopologyException if an invalid topology was submitted
+ * @throws AuthorizationException if authorization is failed
+ * @thorws SubmitterHookException if any Exception occurs during initialization or invocation of registered {@link ISubmitterHook}
+ */
+ public static void submitTopologyWithProgressBar(String name, Map stormConf, StormTopology topology, SubmitOptions opts) throws AlreadyAliveException, InvalidTopologyException, AuthorizationException {
+ // show a progress bar so we know we're not stuck (especially on slow connections)
+ submitTopology(name, stormConf, topology, opts, new StormSubmitter.ProgressListener() {
+ @Override
+ public void onStart(String srcFile, String targetFile, long totalBytes) {
+ System.out.printf("Start uploading file '%s' to '%s' (%d bytes)\n", srcFile, targetFile, totalBytes);
+ }
+
+ @Override
+ public void onProgress(String srcFile, String targetFile, long bytesUploaded, long totalBytes) {
+ int length = 50;
+ int p = (int)((length * bytesUploaded) / totalBytes);
+ String progress = StringUtils.repeat("=", p);
+ String todo = StringUtils.repeat(" ", length - p);
+
+ System.out.printf("\r[%s%s] %d / %d", progress, todo, bytesUploaded, totalBytes);
+ }
+
+ @Override
+ public void onCompleted(String srcFile, String targetFile, long totalBytes) {
+ System.out.printf("\nFile '%s' uploaded to '%s' (%d bytes)\n", srcFile, targetFile, totalBytes);
+ }
+ });
+ }
+
+ private static boolean topologyNameExists(String name, NimbusClient client) {
+ try {
+ return !client.getClient().isTopologyNameAllowed(name);
+ } catch(Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private static String submitJar(Map conf, ProgressListener listener) {
+ return submitJar(conf, System.getProperty("storm.jar"), listener);
+ }
+
+ /**
+ * Submit jar file
+ * @param conf the topology-specific configuration. See {@link Config}.
+ * @param localJar file path of the jar file to submit
+ * @return the remote location of the submitted jar
+ */
+ public static String submitJar(Map conf, String localJar) {
+ return submitJar(conf, localJar, null);
+ }
+
+ public static String submitJarAs(Map conf, String localJar, ProgressListener listener, NimbusClient client) {
+ if (localJar == null) {
+ throw new RuntimeException("Must submit topologies using the 'storm' client script so that StormSubmitter knows which jar to upload.");
+ }
+
+ try {
+ String uploadLocation = client.getClient().beginFileUpload();
+ LOG.info("Uploading topology jar " + localJar + " to assigned location: " + uploadLocation);
+ BufferFileInputStream is = new BufferFileInputStream(localJar, THRIFT_CHUNK_SIZE_BYTES);
+
+ long totalSize = new File(localJar).length();
+ if (listener != null) {
+ listener.onStart(localJar, uploadLocation, totalSize);
+ }
+
+ long bytesUploaded = 0;
+ while(true) {
+ byte[] toSubmit = is.read();
+ bytesUploaded += toSubmit.length;
+ if (listener != null) {
+ listener.onProgress(localJar, uploadLocation, bytesUploaded, totalSize);
+ }
+
+ if(toSubmit.length==0) break;
+ client.getClient().uploadChunk(uploadLocation, ByteBuffer.wrap(toSubmit));
+ }
+ client.getClient().finishFileUpload(uploadLocation);
+
+ if (listener != null) {
+ listener.onCompleted(localJar, uploadLocation, totalSize);
+ }
+
+ LOG.info("Successfully uploaded topology jar to assigned location: " + uploadLocation);
+ return uploadLocation;
+ } catch(Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public static String submitJarAs(Map conf, String localJar, ProgressListener listener, String asUser) {
+ if (localJar == null) {
+ throw new RuntimeException("Must submit topologies using the 'storm' client script so that StormSubmitter knows which jar to upload.");
+ }
+
+ try (NimbusClient client = NimbusClient.getConfiguredClientAs(conf, asUser)) {
+ return submitJarAs(conf, localJar, listener, client);
+ }
+ }
+
+ /**
+ * Submit jar file
+ * @param conf the topology-specific configuration. See {@link Config}.
+ * @param localJar file path of the jar file to submit
+ * @param listener progress listener to track the jar file upload
+ * @return the remote location of the submitted jar
+ */
+ public static String submitJar(Map conf, String localJar, ProgressListener listener) {
+ return submitJarAs(conf,localJar, listener, (String)null);
+ }
+
+ /**
+ * Interface use to track progress of file upload
+ */
+ public interface ProgressListener {
+ /**
+ * called before file is uploaded
+ * @param srcFile - jar file to be uploaded
+ * @param targetFile - destination file
+ * @param totalBytes - total number of bytes of the file
+ */
+ public void onStart(String srcFile, String targetFile, long totalBytes);
+
+ /**
+ * called whenever a chunk of bytes is uploaded
+ * @param srcFile - jar file to be uploaded
+ * @param targetFile - destination file
+ * @param bytesUploaded - number of bytes transferred so far
+ * @param totalBytes - total number of bytes of the file
+ */
+ public void onProgress(String srcFile, String targetFile, long bytesUploaded, long totalBytes);
+
+ /**
+ * called when the file is uploaded
+ * @param srcFile - jar file to be uploaded
+ * @param targetFile - destination file
+ * @param totalBytes - total number of bytes of the file
+ */
+ public void onCompleted(String srcFile, String targetFile, long totalBytes);
+ }
+
+ private static void validateConfs(Map<String, Object> stormConf, StormTopology topology) throws IllegalArgumentException, InvalidTopologyException {
+ ConfigValidation.validateFields(stormConf);
+ validateTopologyWorkerMaxHeapSizeMBConfigs(stormConf, topology);
+ Utils.validateTopologyBlobStoreMap(stormConf, getListOfKeysFromBlobStore(stormConf));
+ }
+
+ private static void validateTopologyWorkerMaxHeapSizeMBConfigs(Map<String, Object> stormConf, StormTopology topology) {
+ double largestMemReq = getMaxExecutorMemoryUsageForTopo(topology, stormConf);
+ Double topologyWorkerMaxHeapSize = ObjectReader.getDouble(stormConf.get(Config.TOPOLOGY_WORKER_MAX_HEAP_SIZE_MB));
+ if(topologyWorkerMaxHeapSize < largestMemReq) {
+ throw new IllegalArgumentException("Topology will not be able to be successfully scheduled: Config TOPOLOGY_WORKER_MAX_HEAP_SIZE_MB="
+ + ObjectReader.getDouble(stormConf.get(Config.TOPOLOGY_WORKER_MAX_HEAP_SIZE_MB)) + " < "
+ + largestMemReq + " (Largest memory requirement of a component in the topology). Perhaps set TOPOLOGY_WORKER_MAX_HEAP_SIZE_MB to a larger amount");
+ }
+ }
+
+ private static double getMaxExecutorMemoryUsageForTopo(StormTopology topology, Map topologyConf) {
+ double largestMemoryOperator = 0.0;
+ for(Map<String, Double> entry : ResourceUtils.getBoltsResources(topology, topologyConf).values()) {
+ double memoryRequirement = entry.get(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB)
+ + entry.get(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB);
+ if(memoryRequirement > largestMemoryOperator) {
+ largestMemoryOperator = memoryRequirement;
+ }
+ }
+ for(Map<String, Double> entry : ResourceUtils.getSpoutsResources(topology, topologyConf).values()) {
+ double memoryRequirement = entry.get(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB)
+ + entry.get(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB);
+ if(memoryRequirement > largestMemoryOperator) {
+ largestMemoryOperator = memoryRequirement;
+ }
+ }
+ return largestMemoryOperator;
+ }
+
+ private static Set<String> getListOfKeysFromBlobStore(Map<String, Object> stormConf) {
+ try (NimbusBlobStore client = new NimbusBlobStore()) {
+ client.prepare(stormConf);
+ return Sets.newHashSet(client.listKeys());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/storm-client/src/jvm/org/apache/storm/StormTimer.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/StormTimer.java b/storm-client/src/jvm/org/apache/storm/StormTimer.java
new file mode 100644
index 0000000..4f6a7d5
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/StormTimer.java
@@ -0,0 +1,242 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm;
+
+import org.apache.storm.utils.Utils;
+import org.apache.storm.utils.Time;
+
+import java.nio.channels.ClosedByInterruptException;
+import java.util.Comparator;
+import java.util.Random;
+import java.util.concurrent.PriorityBlockingQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * The timer defined in this file is very similar to java.util.Timer, except
+ * it integrates with Storm's time simulation capabilities. This lets us test
+ * code that does asynchronous work on the timer thread
+ */
+
+public class StormTimer implements AutoCloseable {
+
+ public static class QueueEntry {
+ public final Long endTimeMs;
+ public final Runnable func;
+ public final String id;
+
+ public QueueEntry(Long endTimeMs, Runnable func, String id) {
+ this.endTimeMs = endTimeMs;
+ this.func = func;
+ this.id = id;
+ }
+ }
+
+ public static class StormTimerTask extends Thread {
+
+ //initialCapacity set to 11 since its the default inital capacity of PriorityBlockingQueue
+ private PriorityBlockingQueue<QueueEntry> queue = new PriorityBlockingQueue<QueueEntry>(11, new Comparator<QueueEntry>() {
+ @Override
+ public int compare(QueueEntry o1, QueueEntry o2) {
+ return o1.endTimeMs.intValue() - o2.endTimeMs.intValue();
+ }
+ });
+
+ // boolean to indicate whether timer is active
+ private AtomicBoolean active = new AtomicBoolean(false);
+
+ // function to call when timer is killed
+ private Thread.UncaughtExceptionHandler onKill;
+
+ //random number generator
+ private Random random = new Random();
+
+ @Override
+ public void run() {
+ while (this.active.get()) {
+ QueueEntry queueEntry = null;
+ try {
+ queueEntry = this.queue.peek();
+ if ((queueEntry != null) && (Time.currentTimeMillis() >= queueEntry.endTimeMs)) {
+ // It is imperative to not run the function
+ // inside the timer lock. Otherwise, it is
+ // possible to deadlock if the fn deals with
+ // other locks, like the submit lock.
+ this.queue.remove(queueEntry);
+ queueEntry.func.run();
+ } else if (queueEntry != null) {
+ // If any events are scheduled, sleep until
+ // event generation. If any recurring events
+ // are scheduled then we will always go
+ // through this branch, sleeping only the
+ // exact necessary amount of time. We give
+ // an upper bound, e.g. 1000 millis, to the
+ // sleeping time, to limit the response time
+ // for detecting any new event within 1 secs.
+ Time.sleep(Math.min(1000, (queueEntry.endTimeMs - Time.currentTimeMillis())));
+ } else {
+ // Otherwise poll to see if any new event
+ // was scheduled. This is, in essence, the
+ // response time for detecting any new event
+ // schedulings when there are no scheduled
+ // events.
+ Time.sleep(1000);
+ }
+ } catch (Throwable e) {
+ if (!(Utils.exceptionCauseIsInstanceOf(InterruptedException.class, e))
+ && !(Utils.exceptionCauseIsInstanceOf(ClosedByInterruptException.class, e))) {
+ this.onKill.uncaughtException(this, e);
+ this.setActive(false);
+ }
+ }
+ }
+ }
+
+ public void setOnKillFunc(Thread.UncaughtExceptionHandler onKill) {
+ this.onKill = onKill;
+ }
+
+ public void setActive(boolean flag) {
+ this.active.set(flag);
+ }
+
+ public boolean isActive() {
+ return this.active.get();
+ }
+
+ public void add(QueueEntry queueEntry) {
+ this.queue.add(queueEntry);
+ }
+ }
+
+ //task to run
+ private StormTimerTask task = new StormTimerTask();
+
+ /**
+ * Makes a Timer in the form of a StormTimerTask Object
+ * @param name name of the timer
+ * @param onKill function to call when timer is killed unexpectedly
+ * @return StormTimerTask object that was initialized
+ */
+ public StormTimer (String name, Thread.UncaughtExceptionHandler onKill) {
+ if (onKill == null) {
+ throw new RuntimeException("onKill func is null!");
+ }
+ if (name == null) {
+ this.task.setName("timer");
+ } else {
+ this.task.setName(name);
+ }
+ this.task.setOnKillFunc(onKill);
+ this.task.setActive(true);
+
+ this.task.setDaemon(true);
+ this.task.setPriority(Thread.MAX_PRIORITY);
+ this.task.start();
+ }
+
+ /**
+ * Schedule a function to be executed in the timer
+ * @param delaySecs the number of seconds to delay before running the function
+ * @param func the function to run
+ * @param checkActive whether to check is the timer is active
+ * @param jitterMs add jitter to the run
+ */
+ public void schedule(int delaySecs, Runnable func, boolean checkActive, int jitterMs) {
+ if (func == null) {
+ throw new RuntimeException("function to schedule is null!");
+ }
+ if (checkActive) {
+ checkActive();
+ }
+ String id = Utils.uuid();
+ long endTimeMs = Time.currentTimeMillis() + Time.secsToMillisLong(delaySecs);
+ if (jitterMs > 0) {
+ endTimeMs = this.task.random.nextInt(jitterMs) + endTimeMs;
+ }
+ task.add(new QueueEntry(endTimeMs, func, id));
+ }
+
+ public void schedule(int delaySecs, Runnable func) {
+ schedule(delaySecs, func, true, 0);
+ }
+
+ /**
+ * Schedule a function to run recurrently
+ * @param delaySecs the number of seconds to delay before running the function
+ * @param recurSecs the time between each invocation
+ * @param func the function to run
+ */
+ public void scheduleRecurring(int delaySecs, final int recurSecs, final Runnable func) {
+ schedule(delaySecs, new Runnable() {
+ @Override
+ public void run() {
+ func.run();
+ // This avoids a race condition with cancel-timer.
+ schedule(recurSecs, this, false, 0);
+ }
+ });
+ }
+
+ /**
+ * schedule a function to run recurrently with jitter
+ * @param delaySecs the number of seconds to delay before running the function
+ * @param recurSecs the time between each invocation
+ * @param jitterMs jitter added to the run
+ * @param func the function to run
+ */
+ public void scheduleRecurringWithJitter(int delaySecs, final int recurSecs, final int jitterMs, final Runnable func) {
+ schedule(delaySecs, new Runnable() {
+ @Override
+ public void run() {
+ func.run();
+ // This avoids a race condition with cancel-timer.
+ schedule(recurSecs, this, false, jitterMs);
+ }
+ });
+ }
+
+ /**
+ * check if timer is active
+ */
+ private void checkActive() {
+ if (!this.task.isActive()) {
+ throw new IllegalStateException("Timer is not active");
+ }
+ }
+
+ /**
+ * cancel timer
+ */
+
+ @Override
+ public void close() throws Exception {
+ if (this.task.isActive()) {
+ this.task.setActive(false);
+ this.task.interrupt();
+ this.task.join();
+ }
+ }
+
+ /**
+ * is timer waiting. Used in timer simulation
+ */
+ public boolean isTimerWaiting() {
+ return Time.isThreadWaiting(task);
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/storm-client/src/jvm/org/apache/storm/Thrift.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/Thrift.java b/storm-client/src/jvm/org/apache/storm/Thrift.java
new file mode 100644
index 0000000..2e5080a
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/Thrift.java
@@ -0,0 +1,350 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm;
+
+import org.apache.storm.generated.Bolt;
+import org.apache.storm.generated.JavaObjectArg;
+import org.apache.storm.generated.SpoutSpec;
+import org.apache.storm.generated.StateSpoutSpec;
+import org.apache.storm.generated.StreamInfo;
+
+import java.lang.reflect.Constructor;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.HashMap;
+import java.io.Serializable;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+
+import org.apache.storm.generated.JavaObject;
+import org.apache.storm.generated.Grouping;
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.generated.StormTopology._Fields;
+import org.apache.storm.generated.ComponentCommon;
+import org.apache.storm.generated.NullStruct;
+import org.apache.storm.generated.GlobalStreamId;
+import org.apache.storm.generated.ComponentObject;
+
+import org.apache.storm.task.IBolt;
+import org.apache.storm.topology.BoltDeclarer;
+import org.apache.storm.topology.IRichBolt;
+import org.apache.storm.topology.IBasicBolt;
+import org.apache.storm.topology.IRichSpout;
+import org.apache.storm.topology.SpoutDeclarer;
+import org.apache.storm.utils.Utils;
+import org.json.simple.JSONValue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.storm.topology.TopologyBuilder;
+
+public class Thrift {
+ private static Logger LOG = LoggerFactory.getLogger(Thrift.class);
+
+ private static StormTopology._Fields[] STORM_TOPOLOGY_FIELDS = null;
+ private static StormTopology._Fields[] SPOUT_FIELDS =
+ { StormTopology._Fields.SPOUTS, StormTopology._Fields.STATE_SPOUTS };
+
+ static {
+ Set<_Fields> keys = StormTopology.metaDataMap.keySet();
+ keys.toArray(STORM_TOPOLOGY_FIELDS = new StormTopology._Fields[keys.size()]);
+ }
+
+ public static StormTopology._Fields[] getTopologyFields() {
+ return STORM_TOPOLOGY_FIELDS;
+ }
+
+ public static StormTopology._Fields[] getSpoutFields() {
+ return SPOUT_FIELDS;
+ }
+
+ public static class SpoutDetails {
+ private IRichSpout spout;
+ private Integer parallelism;
+ private Map conf;
+
+ public SpoutDetails(IRichSpout spout, Integer parallelism, Map conf) {
+ this.spout = spout;
+ this.parallelism = parallelism;
+ this.conf = conf;
+ }
+
+ public IRichSpout getSpout() {
+ return spout;
+ }
+
+ public Integer getParallelism() {
+ return parallelism;
+ }
+
+ public Map getConf() {
+ return conf;
+ }
+ }
+
+ public static class BoltDetails {
+ private Object bolt;
+ private Map conf;
+ private Integer parallelism;
+ private Map<GlobalStreamId, Grouping> inputs;
+
+ public BoltDetails(Object bolt, Map conf, Integer parallelism,
+ Map<GlobalStreamId, Grouping> inputs) {
+ this.bolt = bolt;
+ this.conf = conf;
+ this.parallelism = parallelism;
+ this.inputs = inputs;
+ }
+
+ public Object getBolt() {
+ return bolt;
+ }
+
+ public Map getConf() {
+ return conf;
+ }
+
+ public Map<GlobalStreamId, Grouping> getInputs() {
+ return inputs;
+ }
+
+ public Integer getParallelism() {
+ return parallelism;
+ }
+ }
+
+ public static StreamInfo directOutputFields(List<String> fields) {
+ return new StreamInfo(fields, true);
+ }
+
+ public static StreamInfo outputFields(List<String> fields) {
+ return new StreamInfo(fields, false);
+ }
+
+ public static Grouping prepareShuffleGrouping() {
+ return Grouping.shuffle(new NullStruct());
+ }
+
+ public static Grouping prepareLocalOrShuffleGrouping() {
+ return Grouping.local_or_shuffle(new NullStruct());
+ }
+
+ public static Grouping prepareFieldsGrouping(List<String> fields) {
+ return Grouping.fields(fields);
+ }
+
+ public static Grouping prepareGlobalGrouping() {
+ return prepareFieldsGrouping(new ArrayList<String>());
+ }
+
+ public static Grouping prepareDirectGrouping() {
+ return Grouping.direct(new NullStruct());
+ }
+
+ public static Grouping prepareAllGrouping() {
+ return Grouping.all(new NullStruct());
+ }
+
+ public static Grouping prepareNoneGrouping() {
+ return Grouping.none(new NullStruct());
+ }
+
+ public static Grouping prepareCustomStreamGrouping(Object obj) {
+ return Grouping.custom_serialized(Utils.javaSerialize(obj));
+ }
+
+ public static Grouping prepareCustomJavaObjectGrouping(JavaObject obj) {
+ return Grouping.custom_object(obj);
+ }
+
+ public static Object instantiateJavaObject(JavaObject obj) {
+
+ List<JavaObjectArg> args = obj.get_args_list();
+ Class[] paraTypes = new Class[args.size()];
+ Object[] paraValues = new Object[args.size()];
+ for (int i = 0; i < args.size(); i++) {
+ JavaObjectArg arg = args.get(i);
+ paraValues[i] = arg.getFieldValue();
+
+ if (arg.getSetField().equals(JavaObjectArg._Fields.INT_ARG)) {
+ paraTypes[i] = Integer.class;
+ } else if (arg.getSetField().equals(JavaObjectArg._Fields.LONG_ARG)) {
+ paraTypes[i] = Long.class;
+ } else if (arg.getSetField().equals(JavaObjectArg._Fields.STRING_ARG)) {
+ paraTypes[i] = String.class;
+ } else if (arg.getSetField().equals(JavaObjectArg._Fields.BOOL_ARG)) {
+ paraTypes[i] = Boolean.class;
+ } else if (arg.getSetField().equals(JavaObjectArg._Fields.BINARY_ARG)) {
+ paraTypes[i] = ByteBuffer.class;
+ } else if (arg.getSetField().equals(JavaObjectArg._Fields.DOUBLE_ARG)) {
+ paraTypes[i] = Double.class;
+ } else {
+ paraTypes[i] = Object.class;
+ }
+ }
+
+ try {
+ Class clazz = Class.forName(obj.get_full_class_name());
+ Constructor cons = clazz.getConstructor(paraTypes);
+ return cons.newInstance(paraValues);
+ } catch (Exception e) {
+ LOG.error("java object instantiation failed", e);
+ }
+
+ return null;
+
+ }
+
+ public static Grouping._Fields groupingType(Grouping grouping) {
+ return grouping.getSetField();
+ }
+
+ public static List<String> fieldGrouping(Grouping grouping) {
+ if (!Grouping._Fields.FIELDS.equals(groupingType(grouping))) {
+ throw new IllegalArgumentException("Tried to get grouping fields from non fields grouping");
+ }
+ return grouping.get_fields();
+ }
+
+ public static boolean isGlobalGrouping(Grouping grouping) {
+ if (Grouping._Fields.FIELDS.equals(groupingType(grouping))) {
+ return fieldGrouping(grouping).isEmpty();
+ }
+
+ return false;
+ }
+
+ public static int getParallelismHint(ComponentCommon componentCommon) {
+ if (!componentCommon.is_set_parallelism_hint()) {
+ return 1;
+ } else {
+ return componentCommon.get_parallelism_hint();
+ }
+ }
+
+ public static ComponentObject serializeComponentObject(Object obj) {
+ return ComponentObject.serialized_java(Utils.javaSerialize(obj));
+ }
+
+ public static Object deserializeComponentObject(ComponentObject obj) {
+ if (obj.getSetField() != ComponentObject._Fields.SERIALIZED_JAVA) {
+ throw new RuntimeException("Cannot deserialize non-java-serialized object");
+ }
+ return Utils.javaDeserialize(obj.get_serialized_java(), Serializable.class);
+ }
+
+ public static ComponentCommon prepareComponentCommon(Map<GlobalStreamId, Grouping> inputs, Map<String,
+ StreamInfo> outputs, Integer parallelismHint) {
+ return prepareComponentCommon(inputs, outputs, parallelismHint, null);
+ }
+
+ public static ComponentCommon prepareComponentCommon(Map<GlobalStreamId, Grouping> inputs, Map<String, StreamInfo> outputs,
+ Integer parallelismHint, Map conf) {
+ Map<GlobalStreamId, Grouping> mappedInputs = new HashMap<>();
+ Map<String, StreamInfo> mappedOutputs = new HashMap<>();
+ if (inputs != null && !inputs.isEmpty()) {
+ mappedInputs.putAll(inputs);
+ }
+ if (outputs !=null && !outputs.isEmpty()) {
+ mappedOutputs.putAll(outputs);
+ }
+ ComponentCommon component = new ComponentCommon(mappedInputs, mappedOutputs);
+ if (parallelismHint != null) {
+ component.set_parallelism_hint(parallelismHint);
+ }
+ if (conf != null) {
+ component.set_json_conf(JSONValue.toJSONString(conf));
+ }
+ return component;
+ }
+
+ public static SpoutSpec prepareSerializedSpoutDetails(IRichSpout spout, Map<String, StreamInfo> outputs) {
+ return new SpoutSpec(ComponentObject.serialized_java
+ (Utils.javaSerialize(spout)), prepareComponentCommon(new HashMap<>(), outputs, null, null));
+ }
+
+ public static Bolt prepareSerializedBoltDetails(Map<GlobalStreamId, Grouping> inputs, IBolt bolt, Map<String, StreamInfo> outputs,
+ Integer parallelismHint, Map conf) {
+ ComponentCommon common = prepareComponentCommon(inputs, outputs, parallelismHint, conf);
+ return new Bolt(ComponentObject.serialized_java(Utils.javaSerialize(bolt)), common);
+ }
+
+ public static BoltDetails prepareBoltDetails(Map<GlobalStreamId, Grouping> inputs, Object bolt) {
+ return prepareBoltDetails(inputs, bolt, null, null);
+ }
+
+ public static BoltDetails prepareBoltDetails(Map<GlobalStreamId, Grouping> inputs, Object bolt,
+ Integer parallelismHint) {
+ return prepareBoltDetails(inputs, bolt, parallelismHint, null);
+ }
+
+ public static BoltDetails prepareBoltDetails(Map<GlobalStreamId, Grouping> inputs, Object bolt,
+ Integer parallelismHint, Map conf) {
+ BoltDetails details = new BoltDetails(bolt, conf, parallelismHint, inputs);
+ return details;
+ }
+
+ public static SpoutDetails prepareSpoutDetails(IRichSpout spout) {
+ return prepareSpoutDetails(spout, null, null);
+ }
+
+ public static SpoutDetails prepareSpoutDetails(IRichSpout spout, Integer parallelismHint) {
+ return prepareSpoutDetails(spout, parallelismHint, null);
+ }
+
+ public static SpoutDetails prepareSpoutDetails(IRichSpout spout, Integer parallelismHint, Map conf) {
+ SpoutDetails details = new SpoutDetails(spout, parallelismHint, conf);
+ return details;
+ }
+
+ public static StormTopology buildTopology(HashMap<String, SpoutDetails> spoutMap,
+ HashMap<String, BoltDetails> boltMap, HashMap<String, StateSpoutSpec> stateMap) {
+ return buildTopology(spoutMap, boltMap);
+ }
+
+ private static void addInputs(BoltDeclarer declarer, Map<GlobalStreamId, Grouping> inputs) {
+ for(Entry<GlobalStreamId, Grouping> entry : inputs.entrySet()) {
+ declarer.grouping(entry.getKey(), entry.getValue());
+ }
+ }
+
+ public static StormTopology buildTopology(Map<String, SpoutDetails> spoutMap, Map<String, BoltDetails> boltMap) {
+ TopologyBuilder builder = new TopologyBuilder();
+ for (Entry<String, SpoutDetails> entry : spoutMap.entrySet()) {
+ String spoutID = entry.getKey();
+ SpoutDetails spec = entry.getValue();
+ SpoutDeclarer spoutDeclarer = builder.setSpout(spoutID, spec.getSpout(), spec.getParallelism());
+ spoutDeclarer.addConfigurations(spec.getConf());
+ }
+ for (Entry<String, BoltDetails> entry : boltMap.entrySet()) {
+ String spoutID = entry.getKey();
+ BoltDetails spec = entry.getValue();
+ BoltDeclarer boltDeclarer = null;
+ if (spec.bolt instanceof IRichBolt) {
+ boltDeclarer = builder.setBolt(spoutID, (IRichBolt)spec.getBolt(), spec.getParallelism());
+ } else {
+ boltDeclarer = builder.setBolt(spoutID, (IBasicBolt)spec.getBolt(), spec.getParallelism());
+ }
+ boltDeclarer.addConfigurations(spec.getConf());
+ addInputs(boltDeclarer, spec.getInputs());
+ }
+ return builder.createTopology();
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/storm-client/src/jvm/org/apache/storm/annotation/InterfaceStability.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/annotation/InterfaceStability.java b/storm-client/src/jvm/org/apache/storm/annotation/InterfaceStability.java
new file mode 100644
index 0000000..d05ae75
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/annotation/InterfaceStability.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.annotation;
+
+import java.lang.annotation.Documented;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+
+/**
+ * Annotation to inform users of how much to rely on a particular package,
+ * class or method not changing over time.
+ * </ul>
+ */
+@InterfaceStability.Evolving
+public class InterfaceStability {
+ /**
+ * Can evolve while retaining compatibility for minor release boundaries.;
+ * can break compatibility only at major release (ie. at m.0).
+ */
+ @Documented
+ @Retention(RetentionPolicy.RUNTIME)
+ public @interface Stable {};
+
+ /**
+ * Evolving, but can break compatibility at minor release (i.e. m.x)
+ */
+ @Documented
+ @Retention(RetentionPolicy.RUNTIME)
+ public @interface Evolving {};
+
+ /**
+ * No guarantee is provided as to reliability or stability across any
+ * level of release granularity.
+ */
+ @Documented
+ @Retention(RetentionPolicy.RUNTIME)
+ public @interface Unstable {};
+}
+
http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/storm-client/src/jvm/org/apache/storm/blobstore/AtomicOutputStream.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/blobstore/AtomicOutputStream.java b/storm-client/src/jvm/org/apache/storm/blobstore/AtomicOutputStream.java
new file mode 100644
index 0000000..892e25c
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/blobstore/AtomicOutputStream.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.blobstore;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+/**
+ * An output stream where all of the data is committed on close,
+ * or can be canceled with cancel.
+ */
+public abstract class AtomicOutputStream extends OutputStream {
+ /**
+ * Cancel all of the writes associated with this stream and close it.
+ */
+ public abstract void cancel() throws IOException;
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/storm-client/src/jvm/org/apache/storm/blobstore/BlobStore.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/blobstore/BlobStore.java b/storm-client/src/jvm/org/apache/storm/blobstore/BlobStore.java
new file mode 100644
index 0000000..1c10c40
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/blobstore/BlobStore.java
@@ -0,0 +1,490 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.blobstore;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.Set;
+import java.util.regex.Pattern;
+
+import javax.security.auth.Subject;
+
+import org.apache.storm.nimbus.NimbusInfo;
+import org.apache.storm.utils.ConfigUtils;
+import org.apache.storm.utils.Utils;
+import org.apache.commons.lang.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.storm.daemon.Shutdownable;
+import org.apache.storm.generated.AuthorizationException;
+import org.apache.storm.generated.KeyNotFoundException;
+import org.apache.storm.generated.KeyAlreadyExistsException;
+import org.apache.storm.generated.ReadableBlobMeta;
+import org.apache.storm.generated.SettableBlobMeta;
+import org.apache.storm.generated.StormTopology;
+
+/**
+ * Provides a way to store blobs that can be downloaded.
+ * Blobs must be able to be uploaded and listed from Nimbus,
+ * and downloaded from the Supervisors. It is a key value based
+ * store. Key being a string and value being the blob data.
+ *
+ * ACL checking must take place against the provided subject.
+ * If the blob store does not support Security it must validate
+ * that all ACLs set are always WORLD, everything.
+ *
+ * The users can upload their blobs through the blob store command
+ * line. The command line also allows us to update and delete blobs.
+ *
+ * Modifying the replication factor only works for HdfsBlobStore
+ * as for the LocalFsBlobStore the replication is dependent on
+ * the number of Nimbodes available.
+ */
+public abstract class BlobStore implements Shutdownable {
+ private static final Logger LOG = LoggerFactory.getLogger(BlobStore.class);
+ private static final Pattern KEY_PATTERN = Pattern.compile("^[\\w \\t\\.:_-]+$");
+ protected static final String BASE_BLOBS_DIR_NAME = "blobs";
+
+ /**
+ * Allows us to initialize the blob store
+ * @param conf The storm configuration
+ * @param baseDir The directory path to store the blobs
+ * @param nimbusInfo Contains the nimbus host, port and leadership information.
+ */
+ public abstract void prepare(Map conf, String baseDir, NimbusInfo nimbusInfo);
+
+ /**
+ * Creates the blob.
+ * @param key Key for the blob.
+ * @param meta Metadata which contains the acls information
+ * @param who Is the subject creating the blob.
+ * @return AtomicOutputStream returns a stream into which the data
+ * can be written.
+ * @throws AuthorizationException
+ * @throws KeyAlreadyExistsException
+ */
+ public abstract AtomicOutputStream createBlob(String key, SettableBlobMeta meta, Subject who) throws AuthorizationException, KeyAlreadyExistsException;
+
+ /**
+ * Updates the blob data.
+ * @param key Key for the blob.
+ * @param who Is the subject having the write privilege for the blob.
+ * @return AtomicOutputStream returns a stream into which the data
+ * can be written.
+ * @throws AuthorizationException
+ * @throws KeyNotFoundException
+ */
+ public abstract AtomicOutputStream updateBlob(String key, Subject who) throws AuthorizationException, KeyNotFoundException;
+
+ /**
+ * Gets the current version of metadata for a blob
+ * to be viewed by the user or downloaded by the supervisor.
+ * @param key Key for the blob.
+ * @param who Is the subject having the read privilege for the blob.
+ * @return AtomicOutputStream returns a stream into which the data
+ * can be written.
+ * @throws AuthorizationException
+ * @throws KeyNotFoundException
+ */
+ public abstract ReadableBlobMeta getBlobMeta(String key, Subject who) throws AuthorizationException, KeyNotFoundException;
+
+ /**
+ * Sets the metadata with renewed acls for the blob.
+ * @param key Key for the blob.
+ * @param meta Metadata which contains the updated
+ * acls information.
+ * @param who Is the subject having the write privilege for the blob.
+ * @throws AuthorizationException
+ * @throws KeyNotFoundException
+ */
+ public abstract void setBlobMeta(String key, SettableBlobMeta meta, Subject who) throws AuthorizationException, KeyNotFoundException;
+
+ /**
+ * Deletes the blob data and metadata.
+ * @param key Key for the blob.
+ * @param who Is the subject having write privilege for the blob.
+ * @throws AuthorizationException
+ * @throws KeyNotFoundException
+ */
+ public abstract void deleteBlob(String key, Subject who) throws AuthorizationException, KeyNotFoundException;
+
+ /**
+ * Gets the InputStream to read the blob details
+ * @param key Key for the blob.
+ * @param who Is the subject having the read privilege for the blob.
+ * @return InputStreamWithMeta has the additional
+ * file length and version information.
+ * @throws AuthorizationException
+ * @throws KeyNotFoundException
+ */
+ public abstract InputStreamWithMeta getBlob(String key, Subject who) throws AuthorizationException, KeyNotFoundException;
+
+ /**
+ * Returns an iterator with all the list of
+ * keys currently available on the blob store.
+ * @return Iterator<String>
+ */
+ public abstract Iterator<String> listKeys();
+
+ /**
+ * Gets the replication factor of the blob.
+ * @param key Key for the blob.
+ * @param who Is the subject having the read privilege for the blob.
+ * @return BlobReplication object containing the
+ * replication factor for the blob.
+ * @throws Exception
+ */
+ public abstract int getBlobReplication(String key, Subject who) throws Exception;
+
+ /**
+ * Modifies the replication factor of the blob.
+ * @param key Key for the blob.
+ * @param replication The replication factor the
+ * blob has to be set.
+ * @param who Is the subject having the update privilege for the blob
+ * @return BlobReplication object containing the
+ * updated replication factor for the blob.
+ * @throws AuthorizationException
+ * @throws KeyNotFoundException
+ * @throws IOException
+ */
+ public abstract int updateBlobReplication(String key, int replication, Subject who) throws AuthorizationException, KeyNotFoundException, IOException;
+
+ /**
+ * Filters keys based on the KeyFilter
+ * passed as the argument.
+ * @param filter KeyFilter
+ * @param <R> Type
+ * @return Set of filtered keys
+ */
+ public <R> Set<R> filterAndListKeys(KeyFilter<R> filter) {
+ Set<R> ret = new HashSet<R>();
+ Iterator<String> keys = listKeys();
+ while (keys.hasNext()) {
+ String key = keys.next();
+ R filtered = filter.filter(key);
+ if (filtered != null) {
+ ret.add(filtered);
+ }
+ }
+ return ret;
+ }
+
+ /**
+ * Validates key checking for potentially harmful patterns
+ * @param key Key for the blob.
+ */
+ public static final void validateKey(String key) throws AuthorizationException {
+ if (StringUtils.isEmpty(key) || "..".equals(key) || ".".equals(key) || !KEY_PATTERN.matcher(key).matches()) {
+ LOG.error("'{}' does not appear to be valid {}", key, KEY_PATTERN);
+ throw new AuthorizationException(key+" does not appear to be a valid blob key");
+ }
+ }
+
+ /**
+ * Wrapper called to create the blob which contains
+ * the byte data
+ * @param key Key for the blob.
+ * @param data Byte data that needs to be uploaded.
+ * @param meta Metadata which contains the acls information
+ * @param who Is the subject creating the blob.
+ * @throws AuthorizationException
+ * @throws KeyAlreadyExistsException
+ * @throws IOException
+ */
+ public void createBlob(String key, byte [] data, SettableBlobMeta meta, Subject who) throws AuthorizationException, KeyAlreadyExistsException, IOException {
+ AtomicOutputStream out = null;
+ try {
+ out = createBlob(key, meta, who);
+ out.write(data);
+ out.close();
+ out = null;
+ } finally {
+ if (out != null) {
+ out.cancel();
+ }
+ }
+ }
+
+ /**
+ * Wrapper called to create the blob which contains
+ * the byte data
+ * @param key Key for the blob.
+ * @param in InputStream from which the data is read to be
+ * written as a part of the blob.
+ * @param meta Metadata which contains the acls information
+ * @param who Is the subject creating the blob.
+ * @throws AuthorizationException
+ * @throws KeyAlreadyExistsException
+ * @throws IOException
+ */
+ public void createBlob(String key, InputStream in, SettableBlobMeta meta, Subject who) throws AuthorizationException, KeyAlreadyExistsException, IOException {
+ AtomicOutputStream out = null;
+ try {
+ out = createBlob(key, meta, who);
+ byte[] buffer = new byte[2048];
+ int len = 0;
+ while ((len = in.read(buffer)) > 0) {
+ out.write(buffer, 0, len);
+ }
+ out.close();
+ } catch (AuthorizationException | IOException | RuntimeException e) {
+ if (out !=null) {
+ out.cancel();
+ }
+ } finally {
+ in.close();
+ }
+ }
+
+ /**
+ * Reads the blob from the blob store
+ * and writes it into the output stream.
+ * @param key Key for the blob.
+ * @param out Output stream
+ * @param who Is the subject having read
+ * privilege for the blob.
+ * @throws IOException
+ * @throws KeyNotFoundException
+ * @throws AuthorizationException
+ */
+ public void readBlobTo(String key, OutputStream out, Subject who) throws IOException, KeyNotFoundException, AuthorizationException {
+ InputStreamWithMeta in = getBlob(key, who);
+ if (in == null) {
+ throw new IOException("Could not find " + key);
+ }
+ byte[] buffer = new byte[2048];
+ int len = 0;
+ try{
+ while ((len = in.read(buffer)) > 0) {
+ out.write(buffer, 0, len);
+ }
+ } finally {
+ in.close();
+ out.flush();
+ }
+ }
+
+ /**
+ * Wrapper around readBlobTo which
+ * returns a ByteArray output stream.
+ * @param key Key for the blob.
+ * @param who Is the subject having
+ * the read privilege for the blob.
+ * @return ByteArrayOutputStream
+ * @throws IOException
+ * @throws KeyNotFoundException
+ * @throws AuthorizationException
+ */
+ public byte[] readBlob(String key, Subject who) throws IOException, KeyNotFoundException, AuthorizationException {
+ ByteArrayOutputStream out = new ByteArrayOutputStream();
+ readBlobTo(key, out, who);
+ byte[] bytes = out.toByteArray();
+ out.close();
+ return bytes;
+ }
+
+ /**
+ * Helper method to read a stored topology
+ * @param topoId the id of the topology to read
+ * @param who who to read it as
+ * @return the deserialized topology.
+ * @throws IOException on any error while reading the blob.
+ * @throws AuthorizationException if who is not allowed to read the blob
+ * @throws KeyNotFoundException if the blob could not be found
+ */
+ public StormTopology readTopology(String topoId, Subject who) throws KeyNotFoundException, AuthorizationException, IOException {
+ return Utils.deserialize(readBlob(ConfigUtils.masterStormCodeKey(topoId), who), StormTopology.class);
+ }
+
+ /**
+ * Helper method to read a stored topology config
+ * @param topoId the id of the topology whose conf we are reading
+ * @param who who we are reading this as
+ * @return the deserialized config
+ * @throws KeyNotFoundException if the blob could not be found
+ * @throws AuthorizationException if who is not allowed to read the blob
+ * @throws IOException on any error while reading the blob.
+ */
+ public Map<String, Object> readTopologyConf(String topoId, Subject who) throws KeyNotFoundException, AuthorizationException, IOException {
+ return Utils.fromCompressedJsonConf(readBlob(ConfigUtils.masterStormConfKey(topoId), who));
+ }
+
+ private static final KeyFilter<String> TO_TOPO_ID = (key) -> ConfigUtils.getIdFromBlobKey(key);
+ /**
+ * @return a set of all of the topology ids with special data stored in the
+ * blob store.
+ */
+ public Set<String> storedTopoIds() {
+ return filterAndListKeys(TO_TOPO_ID);
+ }
+
+ /**
+ * Output stream implementation used for reading the
+ * metadata and data information.
+ */
+ protected class BlobStoreFileOutputStream extends AtomicOutputStream {
+ private BlobStoreFile part;
+ private OutputStream out;
+
+ public BlobStoreFileOutputStream(BlobStoreFile part) throws IOException {
+ this.part = part;
+ this.out = part.getOutputStream();
+ }
+
+ @Override
+ public void close() throws IOException {
+ try {
+ //close means commit
+ out.close();
+ part.commit();
+ } catch (IOException | RuntimeException e) {
+ cancel();
+ throw e;
+ }
+ }
+
+ @Override
+ public void cancel() throws IOException {
+ try {
+ out.close();
+ } finally {
+ part.cancel();
+ }
+ }
+
+ @Override
+ public void write(int b) throws IOException {
+ out.write(b);
+ }
+
+ @Override
+ public void write(byte []b) throws IOException {
+ out.write(b);
+ }
+
+ @Override
+ public void write(byte []b, int offset, int len) throws IOException {
+ out.write(b, offset, len);
+ }
+ }
+
+ /**
+ * Input stream implementation used for writing
+ * both the metadata containing the acl information
+ * and the blob data.
+ */
+ protected class BlobStoreFileInputStream extends InputStreamWithMeta {
+ private BlobStoreFile part;
+ private InputStream in;
+
+ public BlobStoreFileInputStream(BlobStoreFile part) throws IOException {
+ this.part = part;
+ this.in = part.getInputStream();
+ }
+
+ @Override
+ public long getVersion() throws IOException {
+ return part.getModTime();
+ }
+
+ @Override
+ public int read() throws IOException {
+ return in.read();
+ }
+
+ @Override
+ public int read(byte[] b, int off, int len) throws IOException {
+ return in.read(b, off, len);
+ }
+
+ @Override
+ public int read(byte[] b) throws IOException {
+ return in.read(b);
+ }
+
+ @Override
+ public int available() throws IOException {
+ return in.available();
+ }
+
+ @Override
+ public long getFileLength() throws IOException {
+ return part.getFileLength();
+ }
+
+ @Override
+ public void close() throws IOException {
+ in.close();
+ }
+ }
+
+ /**
+ * Blob store implements its own version of iterator
+ * to list the blobs
+ */
+ public static class KeyTranslationIterator implements Iterator<String> {
+ private Iterator<String> it = null;
+ private String next = null;
+ private String prefix = null;
+
+ public KeyTranslationIterator(Iterator<String> it, String prefix) throws IOException {
+ this.it = it;
+ this.prefix = prefix;
+ primeNext();
+ }
+
+ private void primeNext() {
+ next = null;
+ while (it.hasNext()) {
+ String tmp = it.next();
+ if (tmp.startsWith(prefix)) {
+ next = tmp.substring(prefix.length());
+ return;
+ }
+ }
+ }
+
+ @Override
+ public boolean hasNext() {
+ return next != null;
+ }
+
+ @Override
+ public String next() {
+ if (!hasNext()) {
+ throw new NoSuchElementException();
+ }
+ String current = next;
+ primeNext();
+ return current;
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException("Delete Not Supported");
+ }
+ }
+}