You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by pt...@apache.org on 2015/12/01 23:05:26 UTC
[48/51] [partial] storm git commit: Update JStorm to latest release
2.1.0
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/Constants.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/Constants.java b/jstorm-core/src/main/java/backtype/storm/Constants.java
index 2797b69..da2d5b7 100755
--- a/jstorm-core/src/main/java/backtype/storm/Constants.java
+++ b/jstorm-core/src/main/java/backtype/storm/Constants.java
@@ -20,9 +20,8 @@ package backtype.storm;
import backtype.storm.coordination.CoordinatedBolt;
import clojure.lang.RT;
-
public class Constants {
- public static final String COORDINATED_STREAM_ID = CoordinatedBolt.class.getName() + "/coord-stream";
+ public static final String COORDINATED_STREAM_ID = CoordinatedBolt.class.getName() + "/coord-stream";
public static final long SYSTEM_TASK_ID = -1;
public static final Object SYSTEM_EXECUTOR_ID = RT.readString("[-1 -1]");
@@ -32,6 +31,6 @@ public class Constants {
public static final String METRICS_STREAM_ID = "__metrics";
public static final String METRICS_TICK_STREAM_ID = "__metrics_tick";
public static final String CREDENTIALS_CHANGED_STREAM_ID = "__credentials";
-
+
public static final String JSTORM_CONF_DIR = "JSTORM_CONF_DIR";
}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/GenericOptionsParser.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/GenericOptionsParser.java b/jstorm-core/src/main/java/backtype/storm/GenericOptionsParser.java
index 9319ce1..88adfb4 100755
--- a/jstorm-core/src/main/java/backtype/storm/GenericOptionsParser.java
+++ b/jstorm-core/src/main/java/backtype/storm/GenericOptionsParser.java
@@ -108,94 +108,92 @@ import org.yaml.snakeyaml.Yaml;
public class GenericOptionsParser {
static final Logger LOG = LoggerFactory.getLogger(GenericOptionsParser.class);
-
+
static final Charset UTF8 = Charset.forName("UTF-8");
-
+
public static final String TOPOLOGY_LIB_PATH = "topology.lib.path";
-
+
public static final String TOPOLOGY_LIB_NAME = "topology.lib.name";
-
+
Config conf;
-
+
CommandLine commandLine;
-
+
// Order in this map is important for these purposes:
// - configuration priority
static final LinkedHashMap<String, OptionProcessor> optionProcessors = new LinkedHashMap<String, OptionProcessor>();
-
+
public GenericOptionsParser(Config conf, String[] args) throws ParseException {
this(conf, new Options(), args);
}
-
+
public GenericOptionsParser(Config conf, Options options, String[] args) throws ParseException {
this.conf = conf;
parseGeneralOptions(options, conf, args);
}
-
+
public String[] getRemainingArgs() {
return commandLine.getArgs();
}
-
+
public Config getConfiguration() {
return conf;
}
-
+
static Options buildGeneralOptions(Options opts) {
Options r = new Options();
-
+
for (Object o : opts.getOptions())
r.addOption((Option) o);
-
- Option libjars = OptionBuilder.withArgName("paths").hasArg().withDescription("comma separated jars to be used by the submitted topology").create("libjars");
+
+ Option libjars =
+ OptionBuilder.withArgName("paths").hasArg().withDescription("comma separated jars to be used by the submitted topology").create("libjars");
r.addOption(libjars);
optionProcessors.put("libjars", new LibjarsProcessor());
-
+
Option conf = OptionBuilder.withArgName("configuration file").hasArg().withDescription("an application configuration file").create("conf");
r.addOption(conf);
optionProcessors.put("conf", new ConfFileProcessor());
-
+
// Must come after `conf': this option is of higher priority
Option extraConfig = OptionBuilder.withArgName("D").hasArg().withDescription("extra configurations (preserving types)").create("D");
r.addOption(extraConfig);
optionProcessors.put("D", new ExtraConfigProcessor());
-
+
return r;
}
-
+
void parseGeneralOptions(Options opts, Config conf, String[] args) throws ParseException {
opts = buildGeneralOptions(opts);
CommandLineParser parser = new GnuParser();
commandLine = parser.parse(opts, args, true);
processGeneralOptions(conf, commandLine);
}
-
+
void processGeneralOptions(Config conf, CommandLine commandLine) throws ParseException {
for (Map.Entry<String, OptionProcessor> e : optionProcessors.entrySet())
if (commandLine.hasOption(e.getKey()))
e.getValue().process(conf, commandLine);
}
-
+
static List<File> validateFiles(String pathList) throws IOException {
List<File> l = new ArrayList<File>();
-
+
for (String s : pathList.split(",")) {
File file = new File(s);
if (!file.exists())
throw new FileNotFoundException("File `" + file.getAbsolutePath() + "' does not exist");
-
+
l.add(file);
}
-
+
return l;
}
-
+
public static void printGenericCommandUsage(PrintStream out) {
String[] strs =
- new String[] {
- "Generic options supported are",
- " -conf <conf.xml> load configurations from",
- " <conf.xml>",
- " -conf <conf.yaml> load configurations from",
+ new String[] { "Generic options supported are", " -conf <conf.xml> load configurations from",
+ " <conf.xml>", " -conf <conf.yaml> load configurations from",
" <conf.yaml>",
" -D <key>=<value> set <key> in configuration",
" to <value> (preserve value's type)",
@@ -205,11 +203,11 @@ public class GenericOptionsParser {
for (String s : strs)
out.println(s);
}
-
+
static interface OptionProcessor {
public void process(Config conf, CommandLine commandLine) throws ParseException;
}
-
+
static class LibjarsProcessor implements OptionProcessor {
@Override
public void process(Config conf, CommandLine commandLine) throws ParseException {
@@ -223,31 +221,31 @@ public class GenericOptionsParser {
}
conf.put(TOPOLOGY_LIB_PATH, jars);
conf.put(TOPOLOGY_LIB_NAME, names);
-
+
} catch (IOException e) {
throw new ParseException(e.getMessage());
}
}
}
-
+
static class ExtraConfigProcessor implements OptionProcessor {
static final Yaml yaml = new Yaml();
-
+
@Override
public void process(Config conf, CommandLine commandLine) throws ParseException {
for (String s : commandLine.getOptionValues("D")) {
String[] keyval = s.split("=", 2);
if (keyval.length != 2)
throw new ParseException("Invalid option value `" + s + "'");
-
+
conf.putAll((Map) yaml.load(keyval[0] + ": " + keyval[1]));
}
}
}
-
+
static class ConfFileProcessor implements OptionProcessor {
static final Yaml yaml = new Yaml();
-
+
static Map loadYamlConf(String f) throws IOException {
InputStreamReader reader = null;
try {
@@ -259,13 +257,13 @@ public class GenericOptionsParser {
reader.close();
}
}
-
+
static Map loadConf(String f) throws IOException {
if (f.endsWith(".yaml"))
return loadYamlConf(f);
throw new IOException("Unknown configuration file type: " + f + " does not end with either .yaml");
}
-
+
@Override
public void process(Config conf, CommandLine commandLine) throws ParseException {
try {
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/ICredentialsListener.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/ICredentialsListener.java b/jstorm-core/src/main/java/backtype/storm/ICredentialsListener.java
index 1a7bc1b..f8f9e9b 100755
--- a/jstorm-core/src/main/java/backtype/storm/ICredentialsListener.java
+++ b/jstorm-core/src/main/java/backtype/storm/ICredentialsListener.java
@@ -26,7 +26,8 @@ import java.util.Map;
public interface ICredentialsListener {
/**
* Called when the credentials of a topology have changed.
+ *
* @param credentials the new credentials, could be null.
*/
- public void setCredentials(Map<String,String> credentials);
+ public void setCredentials(Map<String, String> credentials);
}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/ILocalCluster.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/ILocalCluster.java b/jstorm-core/src/main/java/backtype/storm/ILocalCluster.java
index 7d5aa35..7d31f07 100755
--- a/jstorm-core/src/main/java/backtype/storm/ILocalCluster.java
+++ b/jstorm-core/src/main/java/backtype/storm/ILocalCluster.java
@@ -30,20 +30,33 @@ import backtype.storm.generated.Credentials;
import java.util.Map;
-
public interface ILocalCluster {
void submitTopology(String topologyName, Map conf, StormTopology topology) throws AlreadyAliveException, InvalidTopologyException;
- void submitTopologyWithOpts(String topologyName, Map conf, StormTopology topology, SubmitOptions submitOpts) throws AlreadyAliveException, InvalidTopologyException;
+
+ void submitTopologyWithOpts(String topologyName, Map conf, StormTopology topology, SubmitOptions submitOpts) throws AlreadyAliveException,
+ InvalidTopologyException;
+
void uploadNewCredentials(String topologyName, Credentials creds);
+
void killTopology(String topologyName) throws NotAliveException;
+
void killTopologyWithOpts(String name, KillOptions options) throws NotAliveException;
+
void activate(String topologyName) throws NotAliveException;
+
void deactivate(String topologyName) throws NotAliveException;
+
void rebalance(String name, RebalanceOptions options) throws NotAliveException;
+
void shutdown();
+
String getTopologyConf(String id);
+
StormTopology getTopology(String id);
+
ClusterSummary getClusterInfo();
+
TopologyInfo getTopologyInfo(String id);
+
Map getState();
}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/ILocalDRPC.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/ILocalDRPC.java b/jstorm-core/src/main/java/backtype/storm/ILocalDRPC.java
index e478dca..4482ecd 100755
--- a/jstorm-core/src/main/java/backtype/storm/ILocalDRPC.java
+++ b/jstorm-core/src/main/java/backtype/storm/ILocalDRPC.java
@@ -21,7 +21,6 @@ import backtype.storm.daemon.Shutdownable;
import backtype.storm.generated.DistributedRPC;
import backtype.storm.generated.DistributedRPCInvocations;
-
public interface ILocalDRPC extends DistributedRPC.Iface, DistributedRPCInvocations.Iface, Shutdownable {
- public String getServiceId();
+ public String getServiceId();
}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/LocalCluster.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/LocalCluster.java b/jstorm-core/src/main/java/backtype/storm/LocalCluster.java
index b55bac4..c25c260 100755
--- a/jstorm-core/src/main/java/backtype/storm/LocalCluster.java
+++ b/jstorm-core/src/main/java/backtype/storm/LocalCluster.java
@@ -17,30 +17,21 @@
*/
package backtype.storm;
-import java.util.Map;
-
+import backtype.storm.generated.*;
+import backtype.storm.utils.Utils;
+import com.alibaba.jstorm.utils.JStormUtils;
import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import backtype.storm.generated.ClusterSummary;
-import backtype.storm.generated.Credentials;
-import backtype.storm.generated.KillOptions;
-import backtype.storm.generated.NotAliveException;
-import backtype.storm.generated.RebalanceOptions;
-import backtype.storm.generated.StormTopology;
-import backtype.storm.generated.SubmitOptions;
-import backtype.storm.generated.TopologyInfo;
-import backtype.storm.utils.Utils;
-
-import com.alibaba.jstorm.utils.JStormUtils;
+import java.util.Map;
public class LocalCluster implements ILocalCluster {
-
+
public static Logger LOG = LoggerFactory.getLogger(LocalCluster.class);
-
+
private LocalClusterMap state;
-
+
protected void setLogger() {
// the code is for log4j
// boolean needReset = true;
@@ -56,61 +47,62 @@ public class LocalCluster implements ILocalCluster {
// BasicConfigurator.configure();
// rootLogger.setLevel(Level.INFO);
// }
-
+
}
-
+
// this is easy to debug
protected static LocalCluster instance = null;
-
+
public static LocalCluster getInstance() {
return instance;
}
-
+
public LocalCluster() {
synchronized (LocalCluster.class) {
if (instance != null) {
throw new RuntimeException("LocalCluster should be single");
}
setLogger();
-
+
// fix in zk occur Address family not supported by protocol family:
// connect
System.setProperty("java.net.preferIPv4Stack", "true");
-
+
this.state = LocalUtils.prepareLocalCluster();
if (this.state == null)
throw new RuntimeException("prepareLocalCluster error");
-
+
instance = this;
}
}
-
+
@Override
public void submitTopology(String topologyName, Map conf, StormTopology topology) {
submitTopologyWithOpts(topologyName, conf, topology, null);
}
-
+
@Override
public void submitTopologyWithOpts(String topologyName, Map conf, StormTopology topology, SubmitOptions submitOpts) {
// TODO Auto-generated method stub
if (!Utils.isValidConf(conf))
throw new RuntimeException("Topology conf is not json-serializable");
JStormUtils.setLocalMode(true);
-
+ conf.put(Config.STORM_CLUSTER_MODE, "local");
+
try {
if (submitOpts == null) {
state.getNimbus().submitTopology(topologyName, null, Utils.to_json(conf), topology);
} else {
state.getNimbus().submitTopologyWithOpts(topologyName, null, Utils.to_json(conf), topology, submitOpts);
}
-
+
} catch (Exception e) {
// TODO Auto-generated catch block
LOG.error("Failed to submit topology " + topologyName, e);
throw new RuntimeException(e);
}
}
-
+
@Override
public void killTopology(String topologyName) {
// TODO Auto-generated method stub
@@ -124,7 +116,7 @@ public class LocalCluster implements ILocalCluster {
LOG.error("fail to kill Topology " + topologyName, e);
}
}
-
+
@Override
public void killTopologyWithOpts(String name, KillOptions options) throws NotAliveException {
// TODO Auto-generated method stub
@@ -136,7 +128,7 @@ public class LocalCluster implements ILocalCluster {
throw new RuntimeException(e);
}
}
-
+
@Override
public void activate(String topologyName) {
// TODO Auto-generated method stub
@@ -148,7 +140,7 @@ public class LocalCluster implements ILocalCluster {
throw new RuntimeException(e);
}
}
-
+
@Override
public void deactivate(String topologyName) {
// TODO Auto-generated method stub
@@ -160,7 +152,7 @@ public class LocalCluster implements ILocalCluster {
throw new RuntimeException(e);
}
}
-
+
@Override
public void rebalance(String name, RebalanceOptions options) {
// TODO Auto-generated method stub
@@ -172,7 +164,7 @@ public class LocalCluster implements ILocalCluster {
throw new RuntimeException(e);
}
}
-
+
@Override
public void shutdown() {
// TODO Auto-generated method stub
@@ -180,8 +172,9 @@ public class LocalCluster implements ILocalCluster {
// it take 10 seconds to remove topology's node
JStormUtils.sleepMs(10 * 1000);
this.state.clean();
+ instance = null;
}
-
+
@Override
public String getTopologyConf(String id) {
// TODO Auto-generated method stub
@@ -193,7 +186,7 @@ public class LocalCluster implements ILocalCluster {
}
return null;
}
-
+
@Override
public StormTopology getTopology(String id) {
// TODO Auto-generated method stub
@@ -208,7 +201,7 @@ public class LocalCluster implements ILocalCluster {
}
return null;
}
-
+
@Override
public ClusterSummary getClusterInfo() {
// TODO Auto-generated method stub
@@ -220,7 +213,7 @@ public class LocalCluster implements ILocalCluster {
}
return null;
}
-
+
@Override
public TopologyInfo getTopologyInfo(String id) {
// TODO Auto-generated method stub
@@ -235,7 +228,7 @@ public class LocalCluster implements ILocalCluster {
}
return null;
}
-
+
/***
* You should use getLocalClusterMap() to instead.This function will always return null
* */
@@ -245,11 +238,11 @@ public class LocalCluster implements ILocalCluster {
// TODO Auto-generated method stub
return null;
}
-
+
public LocalClusterMap getLocalClusterMap() {
return state;
}
-
+
public static void main(String[] args) throws Exception {
LocalCluster localCluster = null;
try {
@@ -269,7 +262,7 @@ public class LocalCluster implements ILocalCluster {
} catch (Exception e) {
// TODO Auto-generated catch block
LOG.error("fail to uploadNewCredentials of topologyId: " + topologyName, e);
- }
+ }
}
-
+
}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/LocalClusterMap.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/LocalClusterMap.java b/jstorm-core/src/main/java/backtype/storm/LocalClusterMap.java
index bd99c76..39f23be 100755
--- a/jstorm-core/src/main/java/backtype/storm/LocalClusterMap.java
+++ b/jstorm-core/src/main/java/backtype/storm/LocalClusterMap.java
@@ -31,83 +31,83 @@ import com.alibaba.jstorm.utils.PathUtils;
import com.alibaba.jstorm.zk.Factory;
public class LocalClusterMap {
-
+
public static Logger LOG = LoggerFactory.getLogger(LocalClusterMap.class);
-
+
private NimbusServer nimbusServer;
-
+
private ServiceHandler nimbus;
-
+
private Factory zookeeper;
-
+
private Map conf;
-
+
private List<String> tmpDir;
-
+
private SupervisorManger supervisor;
-
+
public ServiceHandler getNimbus() {
return nimbus;
}
-
+
public void setNimbus(ServiceHandler nimbus) {
this.nimbus = nimbus;
}
-
+
public Factory getZookeeper() {
return zookeeper;
}
-
+
public void setZookeeper(Factory zookeeper) {
this.zookeeper = zookeeper;
}
-
+
public Map getConf() {
return conf;
}
-
+
public void setConf(Map conf) {
this.conf = conf;
}
-
+
public NimbusServer getNimbusServer() {
return nimbusServer;
}
-
+
public void setNimbusServer(NimbusServer nimbusServer) {
this.nimbusServer = nimbusServer;
}
-
+
public SupervisorManger getSupervisor() {
return supervisor;
}
-
+
public void setSupervisor(SupervisorManger supervisor) {
this.supervisor = supervisor;
}
-
+
public List<String> getTmpDir() {
return tmpDir;
}
-
+
public void setTmpDir(List<String> tmpDir) {
this.tmpDir = tmpDir;
}
-
+
public void clean() {
-
+
if (supervisor != null) {
supervisor.ShutdownAllWorkers();
supervisor.shutdown();
}
-
+
if (nimbusServer != null) {
nimbusServer.cleanup();
}
-
+
if (zookeeper != null)
zookeeper.shutdown();
-
+
// it will hava a problem:
// java.io.IOException: Unable to delete file:
// {TmpPath}\{UUID}\version-2\log.1
@@ -122,5 +122,5 @@ public class LocalClusterMap {
}
}
}
-
+
}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/LocalDRPC.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/LocalDRPC.java b/jstorm-core/src/main/java/backtype/storm/LocalDRPC.java
index 4113bf4..a838026 100755
--- a/jstorm-core/src/main/java/backtype/storm/LocalDRPC.java
+++ b/jstorm-core/src/main/java/backtype/storm/LocalDRPC.java
@@ -28,16 +28,16 @@ import com.alibaba.jstorm.drpc.Drpc;
public class LocalDRPC implements ILocalDRPC {
private static final Logger LOG = LoggerFactory.getLogger(LocalDRPC.class);
-
+
private Drpc handler = new Drpc();
private Thread thread;
-
+
private final String serviceId;
-
+
public LocalDRPC() {
-
+
thread = new Thread(new Runnable() {
-
+
@Override
public void run() {
LOG.info("Begin to init local Drpc");
@@ -51,10 +51,10 @@ public class LocalDRPC implements ILocalDRPC {
}
});
thread.start();
-
+
serviceId = ServiceRegistry.registerService(handler);
}
-
+
@Override
public String execute(String functionName, String funcArgs) {
// TODO Auto-generated method stub
@@ -65,36 +65,36 @@ public class LocalDRPC implements ILocalDRPC {
throw new RuntimeException(e);
}
}
-
+
@Override
public void result(String id, String result) throws TException {
// TODO Auto-generated method stub
handler.result(id, result);
}
-
+
@Override
public DRPCRequest fetchRequest(String functionName) throws TException {
// TODO Auto-generated method stub
return handler.fetchRequest(functionName);
}
-
+
@Override
public void failRequest(String id) throws TException {
// TODO Auto-generated method stub
handler.failRequest(id);
}
-
+
@Override
public void shutdown() {
// TODO Auto-generated method stub
ServiceRegistry.unregisterService(this.serviceId);
this.handler.shutdown();
}
-
+
@Override
public String getServiceId() {
// TODO Auto-generated method stub
return serviceId;
}
-
+
}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/LocalUtils.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/LocalUtils.java b/jstorm-core/src/main/java/backtype/storm/LocalUtils.java
index e32c07e..6e5023b 100755
--- a/jstorm-core/src/main/java/backtype/storm/LocalUtils.java
+++ b/jstorm-core/src/main/java/backtype/storm/LocalUtils.java
@@ -39,32 +39,32 @@ import com.alibaba.jstorm.zk.Factory;
import com.alibaba.jstorm.zk.Zookeeper;
public class LocalUtils {
-
+
public static Logger LOG = LoggerFactory.getLogger(LocalUtils.class);
-
+
public static LocalClusterMap prepareLocalCluster() {
LocalClusterMap state = new LocalClusterMap();
try {
List<String> tmpDirs = new ArrayList();
-
+
String zkDir = getTmpDir();
tmpDirs.add(zkDir);
Factory zookeeper = startLocalZookeeper(zkDir);
Map conf = getLocalConf(zookeeper.getZooKeeperServer().getClientPort());
-
+
String nimbusDir = getTmpDir();
tmpDirs.add(nimbusDir);
Map nimbusConf = deepCopyMap(conf);
nimbusConf.put(Config.STORM_LOCAL_DIR, nimbusDir);
NimbusServer instance = new NimbusServer();
-
+
Map supervisorConf = deepCopyMap(conf);
String supervisorDir = getTmpDir();
tmpDirs.add(supervisorDir);
supervisorConf.put(Config.STORM_LOCAL_DIR, supervisorDir);
Supervisor supervisor = new Supervisor();
IContext context = getLocalContext(supervisorConf);
-
+
state.setNimbusServer(instance);
state.setNimbus(instance.launcherLocalServer(nimbusConf, new DefaultInimbus()));
state.setZookeeper(zookeeper);
@@ -75,11 +75,11 @@ public class LocalUtils {
} catch (Exception e) {
LOG.error("prepare cluster error!", e);
state.clean();
-
+
}
return null;
}
-
+
private static Factory startLocalZookeeper(String tmpDir) {
for (int i = 2000; i < 65535; i++) {
try {
@@ -90,11 +90,11 @@ public class LocalUtils {
}
throw new RuntimeException("No port is available to launch an inprocess zookeeper.");
}
-
+
private static String getTmpDir() {
return System.getProperty("java.io.tmpdir") + File.separator + UUID.randomUUID();
}
-
+
private static Map getLocalConf(int port) {
List<String> zkServers = new ArrayList<String>(1);
zkServers.add("localhost");
@@ -110,7 +110,7 @@ public class LocalUtils {
ConfigExtension.setTaskCleanupTimeoutSec(conf, 0);
return conf;
}
-
+
private static IContext getLocalContext(Map conf) {
if (!(Boolean) conf.get(Config.STORM_LOCAL_MODE_ZMQ)) {
IContext result = new NettyContext();
@@ -120,7 +120,7 @@ public class LocalUtils {
}
return null;
}
-
+
private static Map deepCopyMap(Map map) {
return new HashMap(map);
}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/StormSubmitter.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/StormSubmitter.java b/jstorm-core/src/main/java/backtype/storm/StormSubmitter.java
index 400875e..1666b29 100644
--- a/jstorm-core/src/main/java/backtype/storm/StormSubmitter.java
+++ b/jstorm-core/src/main/java/backtype/storm/StormSubmitter.java
@@ -17,6 +17,14 @@
*/
package backtype.storm;
+import backtype.storm.generated.*;
+import backtype.storm.utils.BufferFileInputStream;
+import backtype.storm.utils.NimbusClient;
+import backtype.storm.utils.Utils;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import java.io.File;
import java.nio.ByteBuffer;
import java.util.ArrayList;
@@ -24,26 +32,9 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import org.apache.commons.lang.StringUtils;
-import org.apache.thrift.TException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import backtype.storm.generated.AlreadyAliveException;
-import backtype.storm.generated.InvalidTopologyException;
-import backtype.storm.generated.Nimbus;
-import backtype.storm.generated.NotAliveException;
-import backtype.storm.generated.StormTopology;
-import backtype.storm.generated.SubmitOptions;
-import backtype.storm.generated.TopologyAssignException;
-import backtype.storm.utils.BufferFileInputStream;
-import backtype.storm.utils.NimbusClient;
-import backtype.storm.utils.Utils;
-
/**
- * Use this class to submit topologies to run on the Storm cluster. You should
- * run your program with the "storm jar" command from the command-line, and then
- * use this class to submit your topologies.
+ * Use this class to submit topologies to run on the Storm cluster. You should run your program with the "storm jar" command from the command-line, and then use
+ * this class to submit your topologies.
*/
public class StormSubmitter {
public static Logger LOG = LoggerFactory.getLogger(StormSubmitter.class);
@@ -55,25 +46,20 @@ public class StormSubmitter {
}
/**
- * Submits a topology to run on the cluster. A topology runs forever or
- * until explicitly killed.
+ * Submits a topology to run on the cluster. A topology runs forever or until explicitly killed.
*
*
* @param name the name of the storm.
* @param stormConf the topology-specific configuration. See {@link Config}.
* @param topology the processing to execute.
- * @throws AlreadyAliveException if a topology with this name is already
- * running
+ * @throws AlreadyAliveException if a topology with this name is already running
* @throws InvalidTopologyException if an invalid topology was submitted
*/
- public static void submitTopology(String name, Map stormConf,
- StormTopology topology) throws AlreadyAliveException,
- InvalidTopologyException {
+ public static void submitTopology(String name, Map stormConf, StormTopology topology) throws AlreadyAliveException, InvalidTopologyException {
submitTopology(name, stormConf, topology, null);
}
- public static void submitTopology(String name, Map stormConf,
- StormTopology topology, SubmitOptions opts, List<File> jarFiles)
+ public static void submitTopology(String name, Map stormConf, StormTopology topology, SubmitOptions opts, List<File> jarFiles)
throws AlreadyAliveException, InvalidTopologyException {
if (jarFiles == null) {
jarFiles = new ArrayList<File>();
@@ -83,8 +69,7 @@ public class StormSubmitter {
for (File f : jarFiles) {
if (!f.exists()) {
- LOG.info(f.getName() + " is not existed: "
- + f.getAbsolutePath());
+ LOG.info(f.getName() + " is not existed: " + f.getAbsolutePath());
continue;
}
jars.put(f.getName(), f.getAbsolutePath());
@@ -96,32 +81,25 @@ public class StormSubmitter {
submitTopology(name, stormConf, topology, opts);
}
- public static void submitTopology(String name, Map stormConf,
- StormTopology topology, SubmitOptions opts,
- ProgressListener listener) throws AlreadyAliveException,
- InvalidTopologyException {
+ public static void submitTopology(String name, Map stormConf, StormTopology topology, SubmitOptions opts, ProgressListener listener)
+ throws AlreadyAliveException, InvalidTopologyException {
submitTopology(name, stormConf, topology, opts);
}
/**
- * Submits a topology to run on the cluster. A topology runs forever or
- * until explicitly killed.
+ * Submits a topology to run on the cluster. A topology runs forever or until explicitly killed.
*
*
* @param name the name of the storm.
* @param stormConf the topology-specific configuration. See {@link Config}.
* @param topology the processing to execute.
- * @param options to manipulate the starting of the topology
- * @throws AlreadyAliveException if a topology with this name is already
- * running
+ * @throws AlreadyAliveException if a topology with this name is already running
* @throws InvalidTopologyException if an invalid topology was submitted
*/
- public static void submitTopology(String name, Map stormConf,
- StormTopology topology, SubmitOptions opts)
- throws AlreadyAliveException, InvalidTopologyException {
+ public static void submitTopology(String name, Map stormConf, StormTopology topology, SubmitOptions opts) throws AlreadyAliveException,
+ InvalidTopologyException {
if (!Utils.isValidConf(stormConf)) {
- throw new IllegalArgumentException(
- "Storm conf is not valid. Must be json-serializable");
+ throw new IllegalArgumentException("Storm conf is not valid. Must be json-serializable");
}
stormConf = new HashMap(stormConf);
stormConf.putAll(Utils.readCommandLineOpts());
@@ -137,20 +115,16 @@ public class StormSubmitter {
NimbusClient client = NimbusClient.getConfiguredClient(conf);
try {
if (topologyNameExists(client, conf, name)) {
- throw new RuntimeException("Topology with name `" + name
- + "` already exists on cluster");
+ throw new RuntimeException("Topology with name `" + name + "` already exists on cluster");
}
submitJar(client, conf);
- LOG.info("Submitting topology " + name
- + " in distributed mode with conf " + serConf);
+ LOG.info("Submitting topology " + name + " in distributed mode with conf " + serConf);
if (opts != null) {
- client.getClient().submitTopologyWithOpts(name, path,
- serConf, topology, opts);
+ client.getClient().submitTopologyWithOpts(name, path, serConf, topology, opts);
} else {
// this is for backwards compatibility
- client.getClient().submitTopology(name, path, serConf,
- topology);
+ client.getClient().submitTopology(name, path, serConf, topology);
}
} finally {
client.close();
@@ -173,43 +147,36 @@ public class StormSubmitter {
}
/**
- * Submits a topology to run on the cluster with a progress bar. A topology
- * runs forever or until explicitly killed.
+ * Submits a topology to run on the cluster with a progress bar. A topology runs forever or until explicitly killed.
*
*
* @param name the name of the storm.
* @param stormConf the topology-specific configuration. See {@link Config}.
* @param topology the processing to execute.
- * @throws AlreadyAliveException if a topology with this name is already
- * running
+ * @throws AlreadyAliveException if a topology with this name is already running
* @throws InvalidTopologyException if an invalid topology was submitted
* @throws TopologyAssignException
*/
- public static void submitTopologyWithProgressBar(String name,
- Map stormConf, StormTopology topology)
- throws AlreadyAliveException, InvalidTopologyException {
+ public static void submitTopologyWithProgressBar(String name, Map stormConf, StormTopology topology) throws AlreadyAliveException, InvalidTopologyException {
submitTopologyWithProgressBar(name, stormConf, topology, null);
}
/**
- * Submits a topology to run on the cluster with a progress bar. A topology
- * runs forever or until explicitly killed.
+ * Submits a topology to run on the cluster with a progress bar. A topology runs forever or until explicitly killed.
*
*
* @param name the name of the storm.
* @param stormConf the topology-specific configuration. See {@link Config}.
* @param topology the processing to execute.
* @param opts to manipulate the starting of the topology
- * @throws AlreadyAliveException if a topology with this name is already
- * running
+ * @throws AlreadyAliveException if a topology with this name is already running
* @throws InvalidTopologyException if an invalid topology was submitted
* @throws TopologyAssignException
*/
- public static void submitTopologyWithProgressBar(String name,
- Map stormConf, StormTopology topology, SubmitOptions opts)
- throws AlreadyAliveException, InvalidTopologyException {
+ public static void submitTopologyWithProgressBar(String name, Map stormConf, StormTopology topology, SubmitOptions opts) throws AlreadyAliveException,
+ InvalidTopologyException {
/**
* remove progress bar in jstorm
@@ -218,21 +185,11 @@ public class StormSubmitter {
}
public static boolean topologyNameExists(NimbusClient client, Map conf, String name) {
- if (StringUtils.isBlank(name)) {
- throw new RuntimeException("TopologyName is empty");
- }
-
try {
- String topologyId = client.getClient().getTopologyId(name);
- if (StringUtils.isBlank(topologyId) == false) {
- return true;
- }
- return false;
-
- } catch (NotAliveException e) {
- return false;
+ client.getClient().getTopologyInfoByName(name);
+ return true;
} catch (Exception e) {
- throw new RuntimeException(e);
+ return false;
}
}
@@ -246,15 +203,9 @@ public class StormSubmitter {
String localJar = System.getProperty("storm.jar");
path = client.getClient().beginFileUpload();
String[] pathCache = path.split("/");
- String uploadLocation =
- path + "/stormjar-" + pathCache[pathCache.length - 1]
- + ".jar";
- List<String> lib =
- (List<String>) conf
- .get(GenericOptionsParser.TOPOLOGY_LIB_NAME);
- Map<String, String> libPath =
- (Map<String, String>) conf
- .get(GenericOptionsParser.TOPOLOGY_LIB_PATH);
+ String uploadLocation = path + "/stormjar-" + pathCache[pathCache.length - 1] + ".jar";
+ List<String> lib = (List<String>) conf.get(GenericOptionsParser.TOPOLOGY_LIB_NAME);
+ Map<String, String> libPath = (Map<String, String>) conf.get(GenericOptionsParser.TOPOLOGY_LIB_PATH);
if (lib != null && lib.size() != 0) {
for (String libName : lib) {
String jarPath = path + "/lib/" + libName;
@@ -265,14 +216,12 @@ public class StormSubmitter {
} else {
if (localJar == null) {
// no lib, no client jar
- throw new RuntimeException(
- "No client app jar, please upload it");
+ throw new RuntimeException("No client app jar, please upload it");
}
}
if (localJar != null) {
- submittedJar =
- submitJar(conf, localJar, uploadLocation, client);
+ submittedJar = submitJar(conf, localJar, uploadLocation, client);
} else {
// no client jar, but with lib jar
client.getClient().finishFileUpload(uploadLocation);
@@ -285,36 +234,29 @@ public class StormSubmitter {
}
}
- public static String submitJar(Map conf, String localJar,
- String uploadLocation, NimbusClient client) {
+ public static String submitJar(Map conf, String localJar, String uploadLocation, NimbusClient client) {
if (localJar == null) {
- throw new RuntimeException(
- "Must submit topologies using the 'storm' client script so that StormSubmitter knows which jar to upload.");
+ throw new RuntimeException("Must submit topologies using the 'storm' client script so that StormSubmitter knows which jar to upload.");
}
try {
- LOG.info("Uploading topology jar " + localJar
- + " to assigned location: " + uploadLocation);
+ LOG.info("Uploading topology jar " + localJar + " to assigned location: " + uploadLocation);
int bufferSize = 512 * 1024;
- Object maxBufSizeObject =
- conf.get(Config.NIMBUS_THRIFT_MAX_BUFFER_SIZE);
+ Object maxBufSizeObject = conf.get(Config.NIMBUS_THRIFT_MAX_BUFFER_SIZE);
if (maxBufSizeObject != null) {
bufferSize = Utils.getInt(maxBufSizeObject) / 2;
}
- BufferFileInputStream is =
- new BufferFileInputStream(localJar, bufferSize);
+ BufferFileInputStream is = new BufferFileInputStream(localJar, bufferSize);
while (true) {
byte[] toSubmit = is.read();
if (toSubmit.length == 0)
break;
- client.getClient().uploadChunk(uploadLocation,
- ByteBuffer.wrap(toSubmit));
+ client.getClient().uploadChunk(uploadLocation, ByteBuffer.wrap(toSubmit));
}
client.getClient().finishFileUpload(uploadLocation);
- LOG.info("Successfully uploaded topology jar to assigned location: "
- + uploadLocation);
+ LOG.info("Successfully uploaded topology jar to assigned location: " + uploadLocation);
return uploadLocation;
} catch (Exception e) {
throw new RuntimeException(e);
@@ -350,8 +292,7 @@ public class StormSubmitter {
* @param bytesUploaded - number of bytes transferred so far
* @param totalBytes - total number of bytes of the file
*/
- public void onProgress(String srcFile, String targetFile,
- long bytesUploaded, long totalBytes);
+ public void onProgress(String srcFile, String targetFile, long bytesUploaded, long totalBytes);
/**
* called when the file is uploaded
@@ -360,7 +301,6 @@ public class StormSubmitter {
* @param targetFile - destination file
* @param totalBytes - total number of bytes of the file
*/
- public void onCompleted(String srcFile, String targetFile,
- long totalBytes);
+ public void onCompleted(String srcFile, String targetFile, long totalBytes);
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/Tool.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/Tool.java b/jstorm-core/src/main/java/backtype/storm/Tool.java
index 6722b24..0dc5e32 100755
--- a/jstorm-core/src/main/java/backtype/storm/Tool.java
+++ b/jstorm-core/src/main/java/backtype/storm/Tool.java
@@ -58,13 +58,13 @@ package backtype.storm;
public abstract class Tool {
Config config;
-
+
public abstract int run(String[] args) throws Exception;
-
+
public Config getConf() {
return config;
}
-
+
public void setConf(Config config) {
this.config = config;
}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/ToolRunner.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/ToolRunner.java b/jstorm-core/src/main/java/backtype/storm/ToolRunner.java
index 33f5034..d70da41 100755
--- a/jstorm-core/src/main/java/backtype/storm/ToolRunner.java
+++ b/jstorm-core/src/main/java/backtype/storm/ToolRunner.java
@@ -32,6 +32,8 @@ import backtype.storm.utils.Utils;
* href="{@docRoot} to parse the <a href="{@docRoot} to parse the <a href="{@docRoot} to parse the <a
* href="{@docRoot}
* to parse the <a href="{@docRoot} to parse the <a href="{@docRoot} to parse the <a href="{@docRoot} to parse the <a href="{@docRoot}
+ * to parse the <a href="{@docRoot} to parse the <a href="{@docRoot} to parse the <a href="{@docRoot} to parse the <a href="{@docRoot} to parse the <a
+ * href="{@docRoot} to parse the <a href="{@docRoot} to parse the <a href="{@docRoot} to parse the <a href="{@docRoot}
* /backtype/storm/GenericOptionsParser.html#GenericOptions"> generic storm command line arguments</a> and modifies the <code>Config</code> of the
* <code>Tool</code>. The application-specific options are passed along without being modified.
*
@@ -41,21 +43,22 @@ import backtype.storm.utils.Utils;
public class ToolRunner {
static final Logger LOG = LoggerFactory.getLogger(ToolRunner.class);
-
+
public static void run(Tool tool, String[] args) {
run(tool.getConf(), tool, args);
}
-
+
public static void run(Config conf, Tool tool, String[] args) {
try {
if (conf == null) {
conf = new Config();
conf.putAll(Utils.readStormConfig());
}
-
+
GenericOptionsParser parser = new GenericOptionsParser(conf, args);
+ LOG.info(conf.toString());
tool.setConf(conf);
-
+
System.exit(tool.run(parser.getRemainingArgs()));
} catch (ParseException e) {
LOG.error("Error parsing generic options: {}", e.getMessage());
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/clojure/ClojureBolt.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/clojure/ClojureBolt.java b/jstorm-core/src/main/java/backtype/storm/clojure/ClojureBolt.java
index 5de9bde..d3d1d37 100755
--- a/jstorm-core/src/main/java/backtype/storm/clojure/ClojureBolt.java
+++ b/jstorm-core/src/main/java/backtype/storm/clojure/ClojureBolt.java
@@ -36,15 +36,14 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Map;
-
public class ClojureBolt implements IRichBolt, FinishedCallback {
Map<String, StreamInfo> _fields;
List<String> _fnSpec;
List<String> _confSpec;
List<Object> _params;
-
+
IBolt _bolt;
-
+
public ClojureBolt(List fnSpec, List confSpec, List<Object> params, Map<String, StreamInfo> fields) {
_fnSpec = fnSpec;
_confSpec = confSpec;
@@ -57,21 +56,23 @@ public class ClojureBolt implements IRichBolt, FinishedCallback {
IFn hof = Utils.loadClojureFn(_fnSpec.get(0), _fnSpec.get(1));
try {
IFn preparer = (IFn) hof.applyTo(RT.seq(_params));
- final Map<Keyword,Object> collectorMap = new PersistentArrayMap( new Object[] {
- Keyword.intern(Symbol.create("output-collector")), collector,
- Keyword.intern(Symbol.create("context")), context});
- List<Object> args = new ArrayList<Object>() {{
- add(stormConf);
- add(context);
- add(collectorMap);
- }};
-
+ final Map<Keyword, Object> collectorMap =
+ new PersistentArrayMap(new Object[] { Keyword.intern(Symbol.create("output-collector")), collector,
+ Keyword.intern(Symbol.create("context")), context });
+ List<Object> args = new ArrayList<Object>() {
+ {
+ add(stormConf);
+ add(context);
+ add(collectorMap);
+ }
+ };
+
_bolt = (IBolt) preparer.applyTo(RT.seq(args));
- //this is kind of unnecessary for clojure
+ // this is kind of unnecessary for clojure
try {
_bolt.prepare(stormConf, context, collector);
- } catch(AbstractMethodError ame) {
-
+ } catch (AbstractMethodError ame) {
+
}
} catch (Exception e) {
throw new RuntimeException(e);
@@ -85,16 +86,16 @@ public class ClojureBolt implements IRichBolt, FinishedCallback {
@Override
public void cleanup() {
- try {
- _bolt.cleanup();
- } catch(AbstractMethodError ame) {
-
- }
+ try {
+ _bolt.cleanup();
+ } catch (AbstractMethodError ame) {
+
+ }
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
- for(String stream: _fields.keySet()) {
+ for (String stream : _fields.keySet()) {
StreamInfo info = _fields.get(stream);
declarer.declareStream(stream, info.is_direct(), new Fields(info.get_output_fields()));
}
@@ -102,7 +103,7 @@ public class ClojureBolt implements IRichBolt, FinishedCallback {
@Override
public void finishedId(Object id) {
- if(_bolt instanceof FinishedCallback) {
+ if (_bolt instanceof FinishedCallback) {
((FinishedCallback) _bolt).finishedId(id);
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/clojure/ClojureSpout.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/clojure/ClojureSpout.java b/jstorm-core/src/main/java/backtype/storm/clojure/ClojureSpout.java
index f6422e3..fc231ce 100755
--- a/jstorm-core/src/main/java/backtype/storm/clojure/ClojureSpout.java
+++ b/jstorm-core/src/main/java/backtype/storm/clojure/ClojureSpout.java
@@ -39,37 +39,38 @@ public class ClojureSpout implements IRichSpout {
List<String> _fnSpec;
List<String> _confSpec;
List<Object> _params;
-
+
ISpout _spout;
-
+
public ClojureSpout(List fnSpec, List confSpec, List<Object> params, Map<String, StreamInfo> fields) {
_fnSpec = fnSpec;
_confSpec = confSpec;
_params = params;
_fields = fields;
}
-
@Override
public void open(final Map conf, final TopologyContext context, final SpoutOutputCollector collector) {
IFn hof = Utils.loadClojureFn(_fnSpec.get(0), _fnSpec.get(1));
try {
IFn preparer = (IFn) hof.applyTo(RT.seq(_params));
- final Map<Keyword,Object> collectorMap = new PersistentArrayMap( new Object[] {
- Keyword.intern(Symbol.create("output-collector")), collector,
- Keyword.intern(Symbol.create("context")), context});
- List<Object> args = new ArrayList<Object>() {{
- add(conf);
- add(context);
- add(collectorMap);
- }};
-
+ final Map<Keyword, Object> collectorMap =
+ new PersistentArrayMap(new Object[] { Keyword.intern(Symbol.create("output-collector")), collector,
+ Keyword.intern(Symbol.create("context")), context });
+ List<Object> args = new ArrayList<Object>() {
+ {
+ add(conf);
+ add(context);
+ add(collectorMap);
+ }
+ };
+
_spout = (ISpout) preparer.applyTo(RT.seq(args));
- //this is kind of unnecessary for clojure
+ // this is kind of unnecessary for clojure
try {
_spout.open(conf, context, collector);
- } catch(AbstractMethodError ame) {
-
+ } catch (AbstractMethodError ame) {
+
}
} catch (Exception e) {
throw new RuntimeException(e);
@@ -80,8 +81,8 @@ public class ClojureSpout implements IRichSpout {
public void close() {
try {
_spout.close();
- } catch(AbstractMethodError ame) {
-
+ } catch (AbstractMethodError ame) {
+
}
}
@@ -89,8 +90,8 @@ public class ClojureSpout implements IRichSpout {
public void nextTuple() {
try {
_spout.nextTuple();
- } catch(AbstractMethodError ame) {
-
+ } catch (AbstractMethodError ame) {
+
}
}
@@ -99,8 +100,8 @@ public class ClojureSpout implements IRichSpout {
public void ack(Object msgId) {
try {
_spout.ack(msgId);
- } catch(AbstractMethodError ame) {
-
+ } catch (AbstractMethodError ame) {
+
}
}
@@ -109,20 +110,20 @@ public class ClojureSpout implements IRichSpout {
public void fail(Object msgId) {
try {
_spout.fail(msgId);
- } catch(AbstractMethodError ame) {
-
+ } catch (AbstractMethodError ame) {
+
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
- for(String stream: _fields.keySet()) {
+ for (String stream : _fields.keySet()) {
StreamInfo info = _fields.get(stream);
declarer.declareStream(stream, info.is_direct(), new Fields(info.get_output_fields()));
}
}
-
+
@Override
public Map<String, Object> getComponentConfiguration() {
IFn hof = Utils.loadClojureFn(_confSpec.get(0), _confSpec.get(1));
@@ -137,8 +138,8 @@ public class ClojureSpout implements IRichSpout {
public void activate() {
try {
_spout.activate();
- } catch(AbstractMethodError ame) {
-
+ } catch (AbstractMethodError ame) {
+
}
}
@@ -146,8 +147,8 @@ public class ClojureSpout implements IRichSpout {
public void deactivate() {
try {
_spout.deactivate();
- } catch(AbstractMethodError ame) {
-
+ } catch (AbstractMethodError ame) {
+
}
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/clojure/RichShellBolt.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/clojure/RichShellBolt.java b/jstorm-core/src/main/java/backtype/storm/clojure/RichShellBolt.java
index a155008..53136c7 100755
--- a/jstorm-core/src/main/java/backtype/storm/clojure/RichShellBolt.java
+++ b/jstorm-core/src/main/java/backtype/storm/clojure/RichShellBolt.java
@@ -26,20 +26,20 @@ import java.util.Map;
public class RichShellBolt extends ShellBolt implements IRichBolt {
private Map<String, StreamInfo> _outputs;
-
+
public RichShellBolt(String[] command, Map<String, StreamInfo> outputs) {
super(command);
_outputs = outputs;
}
-
+
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
- for(String stream: _outputs.keySet()) {
+ for (String stream : _outputs.keySet()) {
StreamInfo def = _outputs.get(stream);
- if(def.is_direct()) {
+ if (def.is_direct()) {
declarer.declareStream(stream, true, new Fields(def.get_output_fields()));
} else {
- declarer.declareStream(stream, new Fields(def.get_output_fields()));
+ declarer.declareStream(stream, new Fields(def.get_output_fields()));
}
}
}
@@ -47,5 +47,5 @@ public class RichShellBolt extends ShellBolt implements IRichBolt {
@Override
public Map<String, Object> getComponentConfiguration() {
return null;
- }
+ }
}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/clojure/RichShellSpout.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/clojure/RichShellSpout.java b/jstorm-core/src/main/java/backtype/storm/clojure/RichShellSpout.java
index b49fbef..2f7a134 100755
--- a/jstorm-core/src/main/java/backtype/storm/clojure/RichShellSpout.java
+++ b/jstorm-core/src/main/java/backtype/storm/clojure/RichShellSpout.java
@@ -34,9 +34,9 @@ public class RichShellSpout extends ShellSpout implements IRichSpout {
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
- for(String stream: _outputs.keySet()) {
+ for (String stream : _outputs.keySet()) {
StreamInfo def = _outputs.get(stream);
- if(def.is_direct()) {
+ if (def.is_direct()) {
declarer.declareStream(stream, true, new Fields(def.get_output_fields()));
} else {
declarer.declareStream(stream, new Fields(def.get_output_fields()));
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/command/activate.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/command/activate.java b/jstorm-core/src/main/java/backtype/storm/command/activate.java
index ed12e09..11a0db5 100755
--- a/jstorm-core/src/main/java/backtype/storm/command/activate.java
+++ b/jstorm-core/src/main/java/backtype/storm/command/activate.java
@@ -30,7 +30,7 @@ import backtype.storm.utils.Utils;
*
*/
public class activate {
-
+
/**
* @param args
*/
@@ -39,17 +39,17 @@ public class activate {
if (args == null || args.length == 0) {
throw new InvalidParameterException("Should input topology name");
}
-
+
String topologyName = args[0];
-
+
NimbusClient client = null;
try {
-
+
Map conf = Utils.readStormConfig();
client = NimbusClient.getConfiguredClient(conf);
-
+
client.getClient().activate(topologyName);
-
+
System.out.println("Successfully submit command activate " + topologyName);
} catch (Exception e) {
System.out.println(e.getMessage());
@@ -61,5 +61,5 @@ public class activate {
}
}
}
-
+
}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/command/config_value.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/command/config_value.java b/jstorm-core/src/main/java/backtype/storm/command/config_value.java
index 868ffdc..dd8812a 100755
--- a/jstorm-core/src/main/java/backtype/storm/command/config_value.java
+++ b/jstorm-core/src/main/java/backtype/storm/command/config_value.java
@@ -30,7 +30,7 @@ import backtype.storm.utils.Utils;
*
*/
public class config_value {
-
+
/**
* @param args
*/
@@ -39,12 +39,12 @@ public class config_value {
if (args == null || args.length == 0) {
throw new InvalidParameterException("Should input key name");
}
-
+
String key = args[0];
-
+
Map conf = Utils.readStormConfig();
-
+
System.out.print("VALUE: " + String.valueOf(conf.get(key)));
}
-
+
}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/command/deactivate.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/command/deactivate.java b/jstorm-core/src/main/java/backtype/storm/command/deactivate.java
index 22ac20d..59e97d6 100755
--- a/jstorm-core/src/main/java/backtype/storm/command/deactivate.java
+++ b/jstorm-core/src/main/java/backtype/storm/command/deactivate.java
@@ -30,7 +30,7 @@ import backtype.storm.utils.Utils;
*
*/
public class deactivate {
-
+
/**
* @param args
*/
@@ -39,17 +39,17 @@ public class deactivate {
if (args == null || args.length == 0) {
throw new InvalidParameterException("Should input topology name");
}
-
+
String topologyName = args[0];
-
+
NimbusClient client = null;
try {
-
+
Map conf = Utils.readStormConfig();
client = NimbusClient.getConfiguredClient(conf);
-
+
client.getClient().deactivate(topologyName);
-
+
System.out.println("Successfully submit command deactivate " + topologyName);
} catch (Exception e) {
System.out.println(e.getMessage());
@@ -61,5 +61,5 @@ public class deactivate {
}
}
}
-
+
}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/command/kill_topology.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/command/kill_topology.java b/jstorm-core/src/main/java/backtype/storm/command/kill_topology.java
index 4ab3893..bda20e1 100755
--- a/jstorm-core/src/main/java/backtype/storm/command/kill_topology.java
+++ b/jstorm-core/src/main/java/backtype/storm/command/kill_topology.java
@@ -17,13 +17,13 @@
*/
package backtype.storm.command;
-import java.security.InvalidParameterException;
-import java.util.Map;
-
import backtype.storm.generated.KillOptions;
import backtype.storm.utils.NimbusClient;
import backtype.storm.utils.Utils;
+import java.security.InvalidParameterException;
+import java.util.Map;
+
/**
* Kill topology
*
@@ -31,7 +31,7 @@ import backtype.storm.utils.Utils;
*
*/
public class kill_topology {
-
+
/**
* @param args
*/
@@ -40,28 +40,28 @@ public class kill_topology {
if (args == null || args.length == 0) {
throw new InvalidParameterException("Should input topology name");
}
-
+
String topologyName = args[0];
-
+
NimbusClient client = null;
try {
-
+
Map conf = Utils.readStormConfig();
client = NimbusClient.getConfiguredClient(conf);
-
+
if (args.length == 1) {
-
+
client.getClient().killTopology(topologyName);
} else {
int delaySeconds = Integer.parseInt(args[1]);
-
+
KillOptions options = new KillOptions();
options.set_wait_secs(delaySeconds);
-
+
client.getClient().killTopologyWithOpts(topologyName, options);
-
+
}
-
+
System.out.println("Successfully submit command kill " + topologyName);
} catch (Exception e) {
System.out.println(e.getMessage());
@@ -73,5 +73,5 @@ public class kill_topology {
}
}
}
-
+
}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/command/list.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/command/list.java b/jstorm-core/src/main/java/backtype/storm/command/list.java
index 3b4efdb..0c6930d 100755
--- a/jstorm-core/src/main/java/backtype/storm/command/list.java
+++ b/jstorm-core/src/main/java/backtype/storm/command/list.java
@@ -33,29 +33,29 @@ import backtype.storm.utils.Utils;
*
*/
public class list {
-
+
/**
* @param args
*/
public static void main(String[] args) {
-
+
NimbusClient client = null;
try {
-
+
Map conf = Utils.readStormConfig();
client = NimbusClient.getConfiguredClient(conf);
-
+
if (args.length > 0 && StringUtils.isBlank(args[0]) == false) {
String topologyName = args[0];
TopologyInfo info = client.getClient().getTopologyInfoByName(topologyName);
-
+
System.out.println("Successfully get topology info \n" + Utils.toPrettyJsonString(info));
} else {
ClusterSummary clusterSummary = client.getClient().getClusterInfo();
-
+
System.out.println("Successfully get cluster info \n" + Utils.toPrettyJsonString(clusterSummary));
}
-
+
} catch (Exception e) {
System.out.println(e.getMessage());
e.printStackTrace();
@@ -66,5 +66,5 @@ public class list {
}
}
}
-
+
}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/command/metrics_monitor.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/command/metrics_monitor.java b/jstorm-core/src/main/java/backtype/storm/command/metrics_monitor.java
index 6607445..a673fcf 100755
--- a/jstorm-core/src/main/java/backtype/storm/command/metrics_monitor.java
+++ b/jstorm-core/src/main/java/backtype/storm/command/metrics_monitor.java
@@ -17,13 +17,13 @@
*/
package backtype.storm.command;
-import java.util.Map;
-import java.security.InvalidParameterException;
-
import backtype.storm.generated.MonitorOptions;
import backtype.storm.utils.NimbusClient;
import backtype.storm.utils.Utils;
+import java.security.InvalidParameterException;
+import java.util.Map;
+
/**
* Monitor topology
*
@@ -31,7 +31,7 @@ import backtype.storm.utils.Utils;
*
*/
public class metrics_monitor {
-
+
/**
* @param args
*/
@@ -40,22 +40,22 @@ public class metrics_monitor {
if (args == null || args.length <= 1) {
throw new InvalidParameterException("Should input topology name and enable flag");
}
-
+
String topologyName = args[0];
-
+
NimbusClient client = null;
try {
-
+
Map conf = Utils.readStormConfig();
client = NimbusClient.getConfiguredClient(conf);
-
+
boolean isEnable = Boolean.valueOf(args[1]).booleanValue();
-
+
MonitorOptions options = new MonitorOptions();
options.set_isEnable(isEnable);
-
+
client.getClient().metricMonitor(topologyName, options);
-
+
String str = (isEnable) ? "enable" : "disable";
System.out.println("Successfully submit command to " + str + " the monitor of " + topologyName);
} catch (Exception e) {
@@ -68,5 +68,5 @@ public class metrics_monitor {
}
}
}
-
+
}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/command/rebalance.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/command/rebalance.java b/jstorm-core/src/main/java/backtype/storm/command/rebalance.java
index f0cf69f..6d08934 100755
--- a/jstorm-core/src/main/java/backtype/storm/command/rebalance.java
+++ b/jstorm-core/src/main/java/backtype/storm/command/rebalance.java
@@ -17,13 +17,12 @@
*/
package backtype.storm.command;
-import java.security.InvalidParameterException;
-import java.util.Map;
-
import backtype.storm.generated.RebalanceOptions;
import backtype.storm.utils.NimbusClient;
import backtype.storm.utils.Utils;
+import java.util.Map;
+
/**
* Active topology
*
@@ -32,7 +31,7 @@ import backtype.storm.utils.Utils;
*/
public class rebalance {
static final String REASSIGN_FLAG = "-r";
-
+
/**
* @param args
*/
@@ -42,15 +41,15 @@ public class rebalance {
printErrorInfo();
return;
}
-
+
int argsIndex = 0;
String topologyName = null;
-
+
try {
RebalanceOptions options = new RebalanceOptions();
options.set_reassign(false);
options.set_conf(null);
-
+
if (args[argsIndex].equalsIgnoreCase(REASSIGN_FLAG)) {
options.set_reassign(true);
argsIndex++;
@@ -64,7 +63,7 @@ public class rebalance {
} else {
topologyName = args[argsIndex];
}
-
+
argsIndex++;
if (args.length > argsIndex) {
for (int i = argsIndex; i < args.length; i++) {
@@ -85,32 +84,34 @@ public class rebalance {
}
}
}
-
+
submitRebalance(topologyName, options);
-
- System.out.println("Successfully submit command rebalance " + topologyName + ", delaySecs=" + options.get_wait_secs() + ", reassignFlag=" + options.is_reassign() + ", newConfiguration=" + options.get_conf());
+
+ System.out.println("Successfully submit command rebalance " + topologyName + ", delaySecs=" +
+ options.get_wait_secs() + ", reassignFlag="
+ + options.is_reassign() + ", newConfiguration=" + options.get_conf());
} catch (Exception e) {
System.out.println(e.getMessage());
e.printStackTrace();
throw new RuntimeException(e);
}
}
-
+
private static void printErrorInfo() {
System.out.println("Error: Invalid parameters!");
System.out.println("USAGE: jstorm rebalance [-r] TopologyName [DelayTime] [NewConfig]");
}
-
+
public static void submitRebalance(String topologyName, RebalanceOptions options) throws Exception {
submitRebalance(topologyName, options, null);
}
-
+
public static void submitRebalance(String topologyName, RebalanceOptions options, Map conf) throws Exception {
Map stormConf = Utils.readStormConfig();
if (conf != null) {
stormConf.putAll(conf);
}
-
+
NimbusClient client = null;
try {
client = NimbusClient.getConfiguredClient(stormConf);
@@ -123,5 +124,5 @@ public class rebalance {
}
}
}
-
+
}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/command/restart.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/command/restart.java b/jstorm-core/src/main/java/backtype/storm/command/restart.java
index ecec9a3..5a216ea 100755
--- a/jstorm-core/src/main/java/backtype/storm/command/restart.java
+++ b/jstorm-core/src/main/java/backtype/storm/command/restart.java
@@ -45,26 +45,26 @@ public class restart {
if (args == null || args.length == 0) {
throw new InvalidParameterException("Should input topology name");
}
-
+
String topologyName = args[0];
-
+
NimbusClient client = null;
try {
Map conf = Utils.readStormConfig();
client = NimbusClient.getConfiguredClient(conf);
-
+
System.out.println("It will take 15 ~ 100 seconds to restart, please wait patiently\n");
-
+
if (args.length == 1) {
client.getClient().restart(topologyName, null);
} else {
Map loadConf = Utils.loadConf(args[1]);
String jsonConf = Utils.to_json(loadConf);
System.out.println("New configuration:\n" + jsonConf);
-
+
client.getClient().restart(topologyName, jsonConf);
}
-
+
System.out.println("Successfully submit command restart " + topologyName);
} catch (Exception e) {
System.out.println(e.getMessage());
@@ -76,5 +76,5 @@ public class restart {
}
}
}
-
+
}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/command/update_config.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/command/update_config.java b/jstorm-core/src/main/java/backtype/storm/command/update_config.java
deleted file mode 100644
index be78f19..0000000
--- a/jstorm-core/src/main/java/backtype/storm/command/update_config.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package backtype.storm.command;
-
-import java.security.InvalidParameterException;
-import java.util.Map;
-
-import backtype.storm.utils.NimbusClient;
-import backtype.storm.utils.Utils;
-
-/**
- * Update user configuration
- *
- * @author basti
- *
- */
-public class update_config {
- /**
- * @param args
- */
- public static void main(String[] args) {
- // TODO Auto-generated method stub
- if (args == null || args.length < 2) {
- throw new InvalidParameterException(
- "[USAGE] update_config topologyName config");
- }
-
- String topologyName = args[0];
-
- NimbusClient client = null;
- try {
- Map conf = Utils.readStormConfig();
- client = NimbusClient.getConfiguredClient(conf);
-
- Map loadConf = Utils.loadConf(args[1]);
- String jsonConf = Utils.to_json(loadConf);
- System.out.println("New configuration:\n" + jsonConf);
-
- client.getClient().updateConf(topologyName, jsonConf);
-
- System.out.println("Successfully submit command update_conf "
- + topologyName);
- } catch (Exception e) {
- System.out.println(e.getMessage());
- e.printStackTrace();
- throw new RuntimeException(e);
- } finally {
- if (client != null) {
- client.close();
- }
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/command/update_topology.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/command/update_topology.java b/jstorm-core/src/main/java/backtype/storm/command/update_topology.java
new file mode 100644
index 0000000..85172a7
--- /dev/null
+++ b/jstorm-core/src/main/java/backtype/storm/command/update_topology.java
@@ -0,0 +1,164 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package backtype.storm.command;
+
+import backtype.storm.GenericOptionsParser;
+import backtype.storm.StormSubmitter;
+import backtype.storm.utils.NimbusClient;
+import backtype.storm.utils.Utils;
+import org.apache.commons.cli.*;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Created by xiaojian.fxj on 2015/10/13.
+ */
+public class update_topology {
+ public static final String UPDATE_CONF = "-conf";
+
+ public static final String UPDATE_JAR = "-jar";
+
+ public static void usage() {
+ System.out.println("update topology config, please do as following:");
+ System.out.println("update_topology topologyName -conf configFile");
+
+ System.out.println("update topology jar, please do as following:");
+ System.out.println("update_topology topologyName -jar jarFile");
+
+ System.out.println("update topology jar and conf, please do as following:");
+ System.out.println("update_topology topologyName -jar jarFile -conf configFile");
+ }
+
+ private static Options buildGeneralOptions(Options opts) {
+ Options r = new Options();
+
+ for (Object o : opts.getOptions())
+ r.addOption((Option) o);
+
+ Option jar = OptionBuilder.withArgName("path").hasArg()
+ .withDescription("comma jar of the submitted topology")
+ .create("jar");
+ r.addOption(jar);
+
+ Option conf = OptionBuilder.withArgName("configuration file").hasArg()
+ .withDescription("an application configuration file")
+ .create("conf");
+ r.addOption(conf);
+ return r;
+ }
+
+ private static void updateTopology(String topologyName, String pathJar,
+ String pathConf) {
+ NimbusClient client = null;
+ Map loadMap = null;
+ if (pathConf != null) {
+ loadMap = Utils.loadConf(pathConf);
+ } else {
+ loadMap = new HashMap();
+ }
+
+ Map conf = Utils.readStormConfig();
+
+ conf.putAll(loadMap);
+ client = NimbusClient.getConfiguredClient(conf);
+ try {
+ // update jar
+ String uploadLocation = null;
+ if (pathJar != null) {
+ System.out.println("Jar update to master yet. Submitting jar of " + pathJar);
+ String path = client.getClient().beginFileUpload();
+ String[] pathCache = path.split("/");
+ uploadLocation = path + "/stormjar-" + pathCache[pathCache.length - 1] + ".jar";
+ List<String> lib = (List<String>) conf .get(GenericOptionsParser.TOPOLOGY_LIB_NAME);
+ Map<String, String> libPath = (Map<String, String>) conf .get(GenericOptionsParser.TOPOLOGY_LIB_PATH);
+ if (lib != null && lib.size() != 0) {
+ for (String libName : lib) {
+ String jarPath = path + "/lib/" + libName;
+ client.getClient().beginLibUpload(jarPath);
+ StormSubmitter.submitJar(conf, libPath.get(libName), jarPath, client);
+ }
+
+ } else {
+ if (pathJar == null) {
+ // no lib, no client jar
+ throw new RuntimeException( "No client app jar, please upload it");
+ }
+ }
+
+ if (pathJar != null) {
+ StormSubmitter.submitJar(conf, pathJar, uploadLocation, client);
+ } else {
+ // no client jar, but with lib jar
+ client.getClient().finishFileUpload(uploadLocation);
+ }
+ }
+
+ // update topology
+ String jsonConf = Utils.to_json(loadMap);
+ System.out.println("New configuration:\n" + jsonConf);
+
+ client.getClient().updateTopology(topologyName, uploadLocation,
+ jsonConf);
+
+ System.out.println("Successfully submit command update " + topologyName);
+
+ } catch (Exception e) {
+ e.printStackTrace();
+ throw new RuntimeException(e);
+ } finally {
+ if (client != null) {
+ client.close();
+ }
+ }
+
+ }
+
+ /**
+ * @param args
+ */
+ public static void main(String[] args) {
+ if (args == null || args.length < 3) {
+ System.out.println("Invalid parameter");
+ usage();
+ return;
+ }
+ String topologyName = args[0];
+ try {
+ String[] str2 = Arrays.copyOfRange(args, 1, args.length);
+ CommandLineParser parser = new GnuParser();
+ Options r = buildGeneralOptions(new Options());
+ CommandLine commandLine = parser.parse(r, str2, true);
+
+ String pathConf = null;
+ String pathJar = null;
+ if (commandLine.hasOption("conf")) {
+ pathConf = (commandLine.getOptionValues("conf"))[0];
+ }
+ if (commandLine.hasOption("jar")) {
+ pathJar = (commandLine.getOptionValues("jar"))[0];
+ }
+ if (pathConf != null || pathJar != null)
+ updateTopology(topologyName, pathJar, pathConf);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/coordination/BatchBoltExecutor.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/coordination/BatchBoltExecutor.java b/jstorm-core/src/main/java/backtype/storm/coordination/BatchBoltExecutor.java
index 8653010..d9163e5 100755
--- a/jstorm-core/src/main/java/backtype/storm/coordination/BatchBoltExecutor.java
+++ b/jstorm-core/src/main/java/backtype/storm/coordination/BatchBoltExecutor.java
@@ -32,18 +32,18 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class BatchBoltExecutor implements IRichBolt, FinishedCallback, TimeoutCallback {
- public static Logger LOG = LoggerFactory.getLogger(BatchBoltExecutor.class);
+ public static Logger LOG = LoggerFactory.getLogger(BatchBoltExecutor.class);
byte[] _boltSer;
Map<Object, IBatchBolt> _openTransactions;
Map _conf;
TopologyContext _context;
BatchOutputCollectorImpl _collector;
-
+
public BatchBoltExecutor(IBatchBolt bolt) {
_boltSer = Utils.javaSerialize(bolt);
}
-
+
@Override
public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
_conf = conf;
@@ -57,11 +57,11 @@ public class BatchBoltExecutor implements IRichBolt, FinishedCallback, TimeoutCa
Object id = input.getValue(0);
IBatchBolt bolt = getBatchBolt(id);
try {
- bolt.execute(input);
+ bolt.execute(input);
_collector.ack(input);
- } catch(FailedException e) {
+ } catch (FailedException e) {
LOG.error("Failed to process tuple in batch", e);
- _collector.fail(input);
+ _collector.fail(input);
}
}
@@ -78,30 +78,29 @@ public class BatchBoltExecutor implements IRichBolt, FinishedCallback, TimeoutCa
@Override
public void timeoutId(Object attempt) {
- _openTransactions.remove(attempt);
- }
-
+ _openTransactions.remove(attempt);
+ }
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
newTransactionalBolt().declareOutputFields(declarer);
}
-
+
@Override
public Map<String, Object> getComponentConfiguration() {
return newTransactionalBolt().getComponentConfiguration();
}
-
+
private IBatchBolt getBatchBolt(Object id) {
IBatchBolt bolt = _openTransactions.get(id);
- if(bolt==null) {
+ if (bolt == null) {
bolt = newTransactionalBolt();
bolt.prepare(_conf, _context, _collector, id);
- _openTransactions.put(id, bolt);
+ _openTransactions.put(id, bolt);
}
return bolt;
}
-
+
private IBatchBolt newTransactionalBolt() {
return Utils.javaDeserialize(_boltSer, IBatchBolt.class);
}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/coordination/BatchOutputCollector.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/coordination/BatchOutputCollector.java b/jstorm-core/src/main/java/backtype/storm/coordination/BatchOutputCollector.java
index f5f3457..0b99339 100755
--- a/jstorm-core/src/main/java/backtype/storm/coordination/BatchOutputCollector.java
+++ b/jstorm-core/src/main/java/backtype/storm/coordination/BatchOutputCollector.java
@@ -30,17 +30,16 @@ public abstract class BatchOutputCollector {
}
public abstract List<Integer> emit(String streamId, List<Object> tuple);
-
+
/**
- * Emits a tuple to the specified task on the default output stream. This output
- * stream must have been declared as a direct stream, and the specified task must
- * use a direct grouping on this stream to receive the message.
+ * Emits a tuple to the specified task on the default output stream. This output stream must have been declared as a direct stream, and the specified task
+ * must use a direct grouping on this stream to receive the message.
*/
public void emitDirect(int taskId, List<Object> tuple) {
emitDirect(taskId, Utils.DEFAULT_STREAM_ID, tuple);
}
-
- public abstract void emitDirect(int taskId, String streamId, List<Object> tuple);
-
+
+ public abstract void emitDirect(int taskId, String streamId, List<Object> tuple);
+
public abstract void reportError(Throwable error);
}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/coordination/BatchOutputCollectorImpl.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/coordination/BatchOutputCollectorImpl.java b/jstorm-core/src/main/java/backtype/storm/coordination/BatchOutputCollectorImpl.java
index cae7560..44a1f01 100755
--- a/jstorm-core/src/main/java/backtype/storm/coordination/BatchOutputCollectorImpl.java
+++ b/jstorm-core/src/main/java/backtype/storm/coordination/BatchOutputCollectorImpl.java
@@ -23,11 +23,11 @@ import java.util.List;
public class BatchOutputCollectorImpl extends BatchOutputCollector {
OutputCollector _collector;
-
+
public BatchOutputCollectorImpl(OutputCollector collector) {
_collector = collector;
}
-
+
@Override
public List<Integer> emit(String streamId, List<Object> tuple) {
return _collector.emit(streamId, tuple);
@@ -42,11 +42,11 @@ public class BatchOutputCollectorImpl extends BatchOutputCollector {
public void reportError(Throwable error) {
_collector.reportError(error);
}
-
+
public void ack(Tuple tup) {
_collector.ack(tup);
}
-
+
public void fail(Tuple tup) {
_collector.fail(tup);
}