You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by pt...@apache.org on 2015/12/01 23:04:55 UTC
[17/51] [partial] storm git commit: Update JStorm to latest release
2.1.0
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/nimbus/NimbusData.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/nimbus/NimbusData.java b/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/nimbus/NimbusData.java
index 02b574f..3871074 100644
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/nimbus/NimbusData.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/nimbus/NimbusData.java
@@ -17,36 +17,37 @@
*/
package com.alibaba.jstorm.daemon.nimbus;
-import java.io.IOException;
-import java.nio.channels.Channel;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
+import backtype.storm.Config;
+import backtype.storm.generated.TopologyTaskHbInfo;
+import backtype.storm.scheduler.INimbus;
+import backtype.storm.utils.BufferFileInputStream;
+import backtype.storm.utils.TimeCacheMap;
import com.alibaba.jstorm.cache.JStormCache;
+import com.alibaba.jstorm.callback.AsyncLoopThread;
+import com.alibaba.jstorm.client.ConfigExtension;
import com.alibaba.jstorm.cluster.Cluster;
import com.alibaba.jstorm.cluster.StormClusterState;
import com.alibaba.jstorm.cluster.StormConfig;
import com.alibaba.jstorm.cluster.StormZkClusterState;
+import com.alibaba.jstorm.metric.JStormMetricCache;
+import com.alibaba.jstorm.metric.JStormMetricsReporter;
import com.alibaba.jstorm.task.TkHbCacheTime;
import com.alibaba.jstorm.utils.JStormUtils;
import com.alibaba.jstorm.utils.TimeUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
-import backtype.storm.Config;
-import backtype.storm.scheduler.INimbus;
-import backtype.storm.utils.BufferFileInputStream;
-import backtype.storm.utils.TimeCacheMap;
+import java.io.IOException;
+import java.nio.channels.Channel;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
/**
* All nimbus data
- *
*/
public class NimbusData {
private static final Logger LOG = LoggerFactory.getLogger(NimbusData.class);
@@ -62,7 +63,7 @@ public class NimbusData {
private TimeCacheMap<Object, Object> downloaders;
private TimeCacheMap<Object, Object> uploaders;
// cache thrift response to avoid scan zk too frequently
- private NimbusCache cache;
+ private NimbusCache nimbusCache;
private int startTime;
@@ -82,17 +83,24 @@ public class NimbusData {
private AtomicBoolean isShutdown = new AtomicBoolean(false);
- private final TopologyMetricsRunnable metricRunnable;
+ private TopologyMetricsRunnable metricRunnable;
+ private AsyncLoopThread metricLoopThread;
// The topologys which has been submitted, but the assignment is not
// finished
private TimeCacheMap<String, Object> pendingSubmitTopologys;
-
private Map<String, Integer> topologyTaskTimeout;
-
- private TopologyNettyMgr topologyNettyMgr ;
- @SuppressWarnings({ "unchecked", "rawtypes" })
+ // Map<TopologyId, TasksHeartbeat>
+ private Map<String, TopologyTaskHbInfo> tasksHeartbeat;
+
+ private final JStormMetricCache metricCache;
+
+ private final String clusterName;
+
+ private JStormMetricsReporter metricsReporter;
+
+ @SuppressWarnings({"unchecked", "rawtypes"})
public NimbusData(Map conf, INimbus inimbus) throws Exception {
this.conf = conf;
@@ -104,8 +112,7 @@ public class NimbusData {
createCache();
- this.taskHeartbeatsCache =
- new ConcurrentHashMap<String, Map<Integer, TkHbCacheTime>>();
+ this.taskHeartbeatsCache = new ConcurrentHashMap<String, Map<Integer, TkHbCacheTime>>();
this.scheduExec = Executors.newScheduledThreadPool(SCHEDULE_THREAD_NUM);
@@ -117,66 +124,63 @@ public class NimbusData {
localMode = StormConfig.local_mode(conf);
- this.topologyNettyMgr = new TopologyNettyMgr(conf);
+ this.metricCache = new JStormMetricCache(conf, this.stormClusterState);
+ this.clusterName = ConfigExtension.getClusterName(conf);
+
this.metricRunnable = new TopologyMetricsRunnable(this);
+ this.metricRunnable.init();
- pendingSubmitTopologys =
- new TimeCacheMap<String, Object>(JStormUtils.MIN_30);
-
+ pendingSubmitTopologys = new TimeCacheMap<String, Object>(JStormUtils.MIN_30);
topologyTaskTimeout = new ConcurrentHashMap<String, Integer>();
+ tasksHeartbeat = new ConcurrentHashMap<String, TopologyTaskHbInfo>();
+
+ if (!localMode) {
+ startMetricThreads();
+ }
}
- /**
- * Just for test
- */
- public NimbusData() {
- scheduExec = Executors.newScheduledThreadPool(6);
+ public void startMetricThreads() {
+ this.metricRunnable.start();
- inimubs = null;
- conf = new HashMap<Object, Object>();
- localMode = false;
- this.metricRunnable = new TopologyMetricsRunnable(this);
+ // init nimbus metric reporter
+ this.metricsReporter = new JStormMetricsReporter(this);
+ this.metricsReporter.init();
}
public void createFileHandler() {
- TimeCacheMap.ExpiredCallback<Object, Object> expiredCallback =
- new TimeCacheMap.ExpiredCallback<Object, Object>() {
- @Override
- public void expire(Object key, Object val) {
- try {
- LOG.info("Close file " + String.valueOf(key));
- if (val != null) {
- if (val instanceof Channel) {
- Channel channel = (Channel) val;
- channel.close();
- } else if (val instanceof BufferFileInputStream) {
- BufferFileInputStream is =
- (BufferFileInputStream) val;
- is.close();
- }
- }
- } catch (IOException e) {
- LOG.error(e.getMessage(), e);
+ TimeCacheMap.ExpiredCallback<Object, Object> expiredCallback = new TimeCacheMap.ExpiredCallback<Object, Object>() {
+ @Override
+ public void expire(Object key, Object val) {
+ try {
+ LOG.info("Close file " + String.valueOf(key));
+ if (val != null) {
+ if (val instanceof Channel) {
+ Channel channel = (Channel) val;
+ channel.close();
+ } else if (val instanceof BufferFileInputStream) {
+ BufferFileInputStream is = (BufferFileInputStream) val;
+ is.close();
}
-
}
- };
+ } catch (IOException e) {
+ LOG.error(e.getMessage(), e);
+ }
+
+ }
+ };
- int file_copy_expiration_secs =
- JStormUtils.parseInt(
- conf.get(Config.NIMBUS_FILE_COPY_EXPIRATION_SECS), 30);
- uploaders =
- new TimeCacheMap<Object, Object>(file_copy_expiration_secs,
- expiredCallback);
- downloaders =
- new TimeCacheMap<Object, Object>(file_copy_expiration_secs,
- expiredCallback);
+ int file_copy_expiration_secs = JStormUtils.parseInt(conf.get(Config.NIMBUS_FILE_COPY_EXPIRATION_SECS), 30);
+ uploaders = new TimeCacheMap<Object, Object>(file_copy_expiration_secs, expiredCallback);
+ downloaders = new TimeCacheMap<Object, Object>(file_copy_expiration_secs, expiredCallback);
}
public void createCache() throws IOException {
- cache = new NimbusCache(conf, stormClusterState);
-
- ((StormZkClusterState) stormClusterState).setCache(cache.getMemCache());
+ nimbusCache = new NimbusCache(conf, stormClusterState);
+ ((StormZkClusterState) stormClusterState).setCache(nimbusCache.getMemCache());
+ }
+
+ public String getClusterName() {
+ return clusterName;
}
public int uptime() {
@@ -203,8 +207,7 @@ public class NimbusData {
return taskHeartbeatsCache;
}
- public Map<Integer, TkHbCacheTime> getTaskHeartbeatsCache(
- String topologyId, boolean createIfNotExist) {
+ public Map<Integer, TkHbCacheTime> getTaskHeartbeatsCache(String topologyId, boolean createIfNotExist) {
Map<Integer, TkHbCacheTime> ret = null;
ret = taskHeartbeatsCache.get(topologyId);
if (ret == null && createIfNotExist) {
@@ -214,8 +217,7 @@ public class NimbusData {
return ret;
}
- public void setTaskHeartbeatsCache(
- ConcurrentHashMap<String, Map<Integer, TkHbCacheTime>> taskHeartbeatsCache) {
+ public void setTaskHeartbeatsCache(ConcurrentHashMap<String, Map<Integer, TkHbCacheTime>> taskHeartbeatsCache) {
this.taskHeartbeatsCache = taskHeartbeatsCache;
}
@@ -256,7 +258,7 @@ public class NimbusData {
}
public void cleanup() {
- cache.cleanup();
+ nimbusCache.cleanup();
LOG.info("Successfully shutdown Cache");
try {
stormClusterState.disconnect();
@@ -296,15 +298,19 @@ public class NimbusData {
}
public JStormCache getMemCache() {
- return cache.getMemCache();
+ return nimbusCache.getMemCache();
}
-
+
public JStormCache getDbCache() {
- return cache.getDbCache();
+ return nimbusCache.getDbCache();
}
-
+
public NimbusCache getNimbusCache() {
- return cache;
+ return nimbusCache;
+ }
+
+ public JStormMetricCache getMetricCache() {
+ return metricCache;
}
public final TopologyMetricsRunnable getMetricRunnable() {
@@ -319,9 +325,7 @@ public class NimbusData {
return topologyTaskTimeout;
}
- public TopologyNettyMgr getTopologyNettyMgr() {
- return topologyNettyMgr;
- }
-
-
+ public Map<String, TopologyTaskHbInfo> getTasksHeartbeat() {
+ return tasksHeartbeat;
+ }
}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/nimbus/NimbusServer.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/nimbus/NimbusServer.java b/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/nimbus/NimbusServer.java
index b22088e..5d5e18c 100644
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/nimbus/NimbusServer.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/nimbus/NimbusServer.java
@@ -17,59 +17,51 @@
*/
package com.alibaba.jstorm.daemon.nimbus;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.thrift.protocol.TBinaryProtocol;
-import org.apache.thrift.server.THsHaServer;
-import org.apache.thrift.transport.TNonblockingServerSocket;
-import org.apache.thrift.transport.TTransportException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
import backtype.storm.Config;
import backtype.storm.generated.Nimbus;
import backtype.storm.generated.Nimbus.Iface;
import backtype.storm.scheduler.INimbus;
import backtype.storm.utils.Utils;
-
import com.alibaba.jstorm.callback.AsyncLoopRunnable;
import com.alibaba.jstorm.callback.AsyncLoopThread;
import com.alibaba.jstorm.client.ConfigExtension;
import com.alibaba.jstorm.cluster.StormConfig;
import com.alibaba.jstorm.daemon.supervisor.Httpserver;
import com.alibaba.jstorm.daemon.worker.hearbeat.SyncContainerHb;
-import com.alibaba.jstorm.metric.SimpleJStormMetric;
import com.alibaba.jstorm.schedule.CleanRunnable;
import com.alibaba.jstorm.schedule.FollowerRunnable;
import com.alibaba.jstorm.schedule.MonitorRunnable;
import com.alibaba.jstorm.utils.JStormServerUtils;
import com.alibaba.jstorm.utils.JStormUtils;
+import org.apache.thrift.protocol.TBinaryProtocol;
+import org.apache.thrift.server.THsHaServer;
+import org.apache.thrift.transport.TNonblockingServerSocket;
+import org.apache.thrift.transport.TTransportException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
/**
*
- * NimbusServer work flow: 1. cleanup interrupted topology delete
- * /storm-local-dir/nimbus/topologyid/stormdis delete
- * /storm-zk-root/storms/topologyid
+ * NimbusServer work flow: 1. cleanup interrupted topology delete /storm-local-dir/nimbus/topologyid/stormdis delete /storm-zk-root/storms/topologyid
*
* 2. set /storm-zk-root/storms/topology stats as run
*
- * 3. start one thread, every nimbus.monitor.reeq.secs set
- * /storm-zk-root/storms/ all topology as monitor. when the topology's status is
- * monitor, nimubs would reassign workers 4. start one threa, every
- * nimubs.cleanup.inbox.freq.secs cleanup useless jar
+ * 3. start one thread, every nimbus.monitor.reeq.secs set /storm-zk-root/storms/ all topology as monitor. when the topology's status is monitor, nimubs would
+ * reassign workers 4. start one threa, every nimubs.cleanup.inbox.freq.secs cleanup useless jar
*
* @author version 1: Nathan Marz version 2: Lixin/Chenjun version 3: Longda
*
*/
public class NimbusServer {
- private static final Logger LOG = LoggerFactory
- .getLogger(NimbusServer.class);
+ private static final Logger LOG = LoggerFactory.getLogger(NimbusServer.class);
private NimbusData data;
@@ -83,8 +75,7 @@ public class NimbusServer {
private Httpserver hs;
- private List<AsyncLoopThread> smartThreads =
- new ArrayList<AsyncLoopThread>();
+ private List<AsyncLoopThread> smartThreads = new ArrayList<AsyncLoopThread>();
public static void main(String[] args) throws Exception {
// read configuration files
@@ -134,8 +125,6 @@ public class NimbusServer {
while (!data.isLeader())
Utils.sleep(5000);
- initUploadMetricThread(data);
-
init(conf);
} catch (Throwable e) {
LOG.error("Fail to run nimbus ", e);
@@ -146,8 +135,7 @@ public class NimbusServer {
LOG.info("Quit nimbus");
}
- public ServiceHandler launcherLocalServer(final Map conf, INimbus inimbus)
- throws Exception {
+ public ServiceHandler launcherLocalServer(final Map conf, INimbus inimbus) throws Exception {
LOG.info("Begin to start nimbus on local model");
StormConfig.validate_local_mode(conf);
@@ -156,9 +144,6 @@ public class NimbusServer {
data = createNimbusData(conf, inimbus);
- // @@@ testing
- initUploadMetricThread(data);
-
init(conf);
return serviceHandler;
@@ -184,6 +169,8 @@ public class NimbusServer {
serviceHandler = new ServiceHandler(data);
if (!data.isLocalMode()) {
+
+ //data.startMetricThreads();
initMonitor(conf);
@@ -193,15 +180,11 @@ public class NimbusServer {
}
@SuppressWarnings("rawtypes")
- private NimbusData createNimbusData(Map conf, INimbus inimbus)
- throws Exception {
+ private NimbusData createNimbusData(Map conf, INimbus inimbus) throws Exception {
// Callback callback=new TimerCallBack();
// StormTimer timer=Timer.mkTimerTimer(callback);
- NimbusData data = new NimbusData(conf, inimbus);
-
- return data;
-
+ return new NimbusData(conf, inimbus);
}
private void initTopologyAssign() {
@@ -218,9 +201,9 @@ public class NimbusServer {
for (String topologyid : active_ids) {
// set the topology status as startup
// in fact, startup won't change anything
- NimbusUtils.transition(data, topologyid, false,
- StatusType.startup);
+ NimbusUtils.transition(data, topologyid, false, StatusType.startup);
NimbusUtils.updateTopologyTaskTimeout(data, topologyid);
+ NimbusUtils.updateTopologyTaskHb(data, topologyid);
}
}
@@ -235,20 +218,15 @@ public class NimbusServer {
// Schedule Nimbus monitor
MonitorRunnable r1 = new MonitorRunnable(data);
- int monitor_freq_secs =
- JStormUtils.parseInt(conf.get(Config.NIMBUS_MONITOR_FREQ_SECS),
- 10);
- scheduExec.scheduleAtFixedRate(r1, 0, monitor_freq_secs,
- TimeUnit.SECONDS);
+ int monitor_freq_secs = JStormUtils.parseInt(conf.get(Config.NIMBUS_MONITOR_FREQ_SECS), 10);
+ scheduExec.scheduleAtFixedRate(r1, 0, monitor_freq_secs, TimeUnit.SECONDS);
LOG.info("Successfully init Monitor thread");
}
/**
- * Right now, every 600 seconds, nimbus will clean jar under
- * /LOCAL-DIR/nimbus/inbox, which is the uploading topology directory
+ * Right now, every 600 seconds, nimbus will clean jar under /LOCAL-DIR/nimbus/inbox, which is the uploading topology directory
*
- * @param conf
* @throws IOException
*/
@SuppressWarnings("rawtypes")
@@ -257,39 +235,25 @@ public class NimbusServer {
// Schedule Nimbus inbox cleaner/nimbus/inbox jar
String dir_location = StormConfig.masterInbox(conf);
- int inbox_jar_expiration_secs =
- JStormUtils
- .parseInt(conf
- .get(Config.NIMBUS_INBOX_JAR_EXPIRATION_SECS),
- 3600);
- CleanRunnable r2 =
- new CleanRunnable(dir_location, inbox_jar_expiration_secs);
+ int inbox_jar_expiration_secs = JStormUtils.parseInt(conf.get(Config.NIMBUS_INBOX_JAR_EXPIRATION_SECS), 3600);
+ CleanRunnable r2 = new CleanRunnable(dir_location, inbox_jar_expiration_secs);
- int cleanup_inbox_freq_secs =
- JStormUtils.parseInt(
- conf.get(Config.NIMBUS_CLEANUP_INBOX_FREQ_SECS), 600);
-
- scheduExec.scheduleAtFixedRate(r2, 0, cleanup_inbox_freq_secs,
- TimeUnit.SECONDS);
+ int cleanup_inbox_freq_secs = JStormUtils.parseInt(conf.get(Config.NIMBUS_CLEANUP_INBOX_FREQ_SECS), 600);
+ scheduExec.scheduleAtFixedRate(r2, 0, cleanup_inbox_freq_secs, TimeUnit.SECONDS);
LOG.info("Successfully init " + dir_location + " cleaner");
}
@SuppressWarnings("rawtypes")
private void initThrift(Map conf) throws TTransportException {
- Integer thrift_port =
- JStormUtils.parseInt(conf.get(Config.NIMBUS_THRIFT_PORT));
- TNonblockingServerSocket socket =
- new TNonblockingServerSocket(thrift_port);
+ Integer thrift_port = JStormUtils.parseInt(conf.get(Config.NIMBUS_THRIFT_PORT));
+ TNonblockingServerSocket socket = new TNonblockingServerSocket(thrift_port);
- Integer maxReadBufSize =
- JStormUtils.parseInt(conf
- .get(Config.NIMBUS_THRIFT_MAX_BUFFER_SIZE));
+ Integer maxReadBufSize = JStormUtils.parseInt(conf.get(Config.NIMBUS_THRIFT_MAX_BUFFER_SIZE));
THsHaServer.Args args = new THsHaServer.Args(socket);
args.workerThreads(ServiceHandler.THREAD_NUM);
- args.protocolFactory(new TBinaryProtocol.Factory(false, true,
- maxReadBufSize, -1));
+ args.protocolFactory(new TBinaryProtocol.Factory(false, true, maxReadBufSize, -1));
args.processor(new Nimbus.Processor<Iface>(serviceHandler));
args.maxReadBufferBytes = maxReadBufSize;
@@ -317,53 +281,15 @@ public class NimbusServer {
});
}
- private void initUploadMetricThread(NimbusData data) {
- final TopologyMetricsRunnable metricRunnable = data.getMetricRunnable();
-
- int threadNum = ConfigExtension.getNimbusMetricThreadNum(data.getConf());
-
- for (int i = 0; i < threadNum; i++) {
- AsyncLoopThread thread = new AsyncLoopThread(metricRunnable);
- smartThreads.add(thread);
- }
-
- Runnable pusher = new Runnable() {
-
- @Override
- public void run() {
- // TODO Auto-generated method stub
- TopologyMetricsRunnable.Upload event =
- new TopologyMetricsRunnable.Upload();
- event.timeStamp = System.currentTimeMillis();
-
- metricRunnable.pushEvent(event);
- }
-
- };
-
- ScheduledExecutorService scheduleService = data.getScheduExec();
- scheduleService.scheduleAtFixedRate(pusher, 120, 60,
- TimeUnit.SECONDS);
-
- SimpleJStormMetric nimbusMetric = SimpleJStormMetric.mkInstance();
- scheduleService.scheduleAtFixedRate(nimbusMetric, 120, 60,
- TimeUnit.SECONDS);
-
- //AsyncLoopThread nimbusCacheThread = new AsyncLoopThread(data.getNimbusCache().getCacheRunnable());
- //smartThreads.add(nimbusCacheThread);
-
- LOG.info("Successfully init metrics uploading thread");
- }
-
public void cleanup() {
- if (data.getIsShutdown().getAndSet(true) == true) {
+ if (data.getIsShutdown().getAndSet(true)) {
LOG.info("Notify to quit nimbus");
return;
}
LOG.info("Begin to shutdown nimbus");
AsyncLoopRunnable.getShutdown().set(true);
-
+
data.getScheduExec().shutdownNow();
for (AsyncLoopThread t : smartThreads) {
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/nimbus/NimbusUtils.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/nimbus/NimbusUtils.java b/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/nimbus/NimbusUtils.java
index 7181e77..4e032e3 100644
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/nimbus/NimbusUtils.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/nimbus/NimbusUtils.java
@@ -17,53 +17,31 @@
*/
package com.alibaba.jstorm.daemon.nimbus;
-import java.io.BufferedReader;
-import java.io.Closeable;
-import java.io.File;
-import java.io.FileReader;
-import java.io.IOException;
-import java.security.InvalidParameterException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
import backtype.storm.Config;
-import backtype.storm.generated.Bolt;
-import backtype.storm.generated.ComponentCommon;
-import backtype.storm.generated.NimbusStat;
-import backtype.storm.generated.NimbusSummary;
-import backtype.storm.generated.NotAliveException;
-import backtype.storm.generated.SpoutSpec;
-import backtype.storm.generated.StateSpoutSpec;
-import backtype.storm.generated.StormTopology;
-import backtype.storm.generated.SupervisorSummary;
-import backtype.storm.generated.TopologySummary;
+import backtype.storm.generated.*;
import backtype.storm.utils.ThriftTopologyUtils;
import backtype.storm.utils.Utils;
-
import com.alibaba.jstorm.client.ConfigExtension;
import com.alibaba.jstorm.cluster.Cluster;
+import com.alibaba.jstorm.cluster.Common;
import com.alibaba.jstorm.cluster.StormBase;
import com.alibaba.jstorm.cluster.StormClusterState;
import com.alibaba.jstorm.cluster.StormConfig;
import com.alibaba.jstorm.daemon.supervisor.SupervisorInfo;
import com.alibaba.jstorm.schedule.Assignment;
import com.alibaba.jstorm.schedule.default_assign.ResourceWorkerSlot;
+import com.alibaba.jstorm.task.TaskInfo;
import com.alibaba.jstorm.task.TkHbCacheTime;
-import com.alibaba.jstorm.task.heartbeat.TaskHeartbeat;
import com.alibaba.jstorm.utils.JStormUtils;
import com.alibaba.jstorm.utils.PathUtils;
import com.alibaba.jstorm.utils.TimeUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.security.InvalidParameterException;
+import java.util.*;
+import java.util.Map.Entry;
public class NimbusUtils {
@@ -71,7 +49,6 @@ public class NimbusUtils {
/**
* add coustom KRYO serialization
- *
*/
private static Map mapifySerializations(List sers) {
Map rtn = new HashMap();
@@ -92,8 +69,6 @@ public class NimbusUtils {
/**
* Normalize stormConf
*
- *
- *
* @param conf
* @param stormConf
* @param topology
@@ -101,8 +76,7 @@ public class NimbusUtils {
* @throws Exception
*/
@SuppressWarnings("rawtypes")
- public static Map normalizeConf(Map conf, Map stormConf,
- StormTopology topology) throws Exception {
+ public static Map normalizeConf(Map conf, Map stormConf, StormTopology topology) throws Exception {
List kryoRegisterList = new ArrayList();
List kryoDecoratorList = new ArrayList();
@@ -113,18 +87,14 @@ public class NimbusUtils {
Object totalRegister = totalConf.get(Config.TOPOLOGY_KRYO_REGISTER);
if (totalRegister != null) {
- LOG.info("topology:" + stormConf.get(Config.TOPOLOGY_NAME)
- + ", TOPOLOGY_KRYO_REGISTER"
- + totalRegister.getClass().getName());
+ LOG.info("topology:" + stormConf.get(Config.TOPOLOGY_NAME) + ", TOPOLOGY_KRYO_REGISTER" + totalRegister.getClass().getName());
JStormUtils.mergeList(kryoRegisterList, totalRegister);
}
Object totalDecorator = totalConf.get(Config.TOPOLOGY_KRYO_DECORATORS);
if (totalDecorator != null) {
- LOG.info("topology:" + stormConf.get(Config.TOPOLOGY_NAME)
- + ", TOPOLOGY_KRYO_DECORATOR"
- + totalDecorator.getClass().getName());
+ LOG.info("topology:" + stormConf.get(Config.TOPOLOGY_NAME) + ", TOPOLOGY_KRYO_DECORATOR" + totalDecorator.getClass().getName());
JStormUtils.mergeList(kryoDecoratorList, totalDecorator);
}
@@ -132,9 +102,7 @@ public class NimbusUtils {
for (Iterator it = cids.iterator(); it.hasNext();) {
String componentId = (String) it.next();
- ComponentCommon common =
- ThriftTopologyUtils.getComponentCommon(topology,
- componentId);
+ ComponentCommon common = ThriftTopologyUtils.getComponentCommon(topology, componentId);
String json = common.get_json_conf();
if (json == null) {
continue;
@@ -150,24 +118,18 @@ public class NimbusUtils {
throw new Exception(sb.toString());
}
- Object componentKryoRegister =
- mtmp.get(Config.TOPOLOGY_KRYO_REGISTER);
+ Object componentKryoRegister = mtmp.get(Config.TOPOLOGY_KRYO_REGISTER);
if (componentKryoRegister != null) {
- LOG.info("topology:" + stormConf.get(Config.TOPOLOGY_NAME)
- + ", componentId:" + componentId
- + ", TOPOLOGY_KRYO_REGISTER"
+ LOG.info("topology:" + stormConf.get(Config.TOPOLOGY_NAME) + ", componentId:" + componentId + ", TOPOLOGY_KRYO_REGISTER"
+ componentKryoRegister.getClass().getName());
JStormUtils.mergeList(kryoRegisterList, componentKryoRegister);
}
- Object componentDecorator =
- mtmp.get(Config.TOPOLOGY_KRYO_DECORATORS);
+ Object componentDecorator = mtmp.get(Config.TOPOLOGY_KRYO_DECORATORS);
if (componentDecorator != null) {
- LOG.info("topology:" + stormConf.get(Config.TOPOLOGY_NAME)
- + ", componentId:" + componentId
- + ", TOPOLOGY_KRYO_DECORATOR"
+ LOG.info("topology:" + stormConf.get(Config.TOPOLOGY_NAME) + ", componentId:" + componentId + ", TOPOLOGY_KRYO_DECORATOR"
+ componentDecorator.getClass().getName());
JStormUtils.mergeList(kryoDecoratorList, componentDecorator);
}
@@ -177,25 +139,23 @@ public class NimbusUtils {
Map kryoRegisterMap = mapifySerializations(kryoRegisterList);
List decoratorList = JStormUtils.distinctList(kryoDecoratorList);
- Integer ackerNum =
- JStormUtils.parseInt(totalConf
- .get(Config.TOPOLOGY_ACKER_EXECUTORS));
+ Integer ackerNum = JStormUtils.parseInt(totalConf.get(Config.TOPOLOGY_ACKER_EXECUTORS));
if (ackerNum == null) {
ackerNum = Integer.valueOf(1);
}
Map rtn = new HashMap();
+ //ensure to be cluster_mode
+ rtn.put(Config.STORM_CLUSTER_MODE, conf.get(Config.STORM_CLUSTER_MODE));
rtn.putAll(stormConf);
- rtn.put(Config.TOPOLOGY_KRYO_DECORATORS, decoratorList);
+ rtn.put(Config.TOPOLOGY_KRYO_DECORATORS, decoratorList);
rtn.put(Config.TOPOLOGY_KRYO_REGISTER, kryoRegisterMap);
rtn.put(Config.TOPOLOGY_ACKER_EXECUTORS, ackerNum);
- rtn.put(Config.TOPOLOGY_MAX_TASK_PARALLELISM,
- totalConf.get(Config.TOPOLOGY_MAX_TASK_PARALLELISM));
+ rtn.put(Config.TOPOLOGY_MAX_TASK_PARALLELISM, totalConf.get(Config.TOPOLOGY_MAX_TASK_PARALLELISM));
return rtn;
}
- public static Integer componentParalism(Map stormConf,
- ComponentCommon common) {
+ public static Integer componentParalism(Map stormConf, ComponentCommon common) {
Map mergeMap = new HashMap();
mergeMap.putAll(stormConf);
@@ -223,8 +183,7 @@ public class NimbusUtils {
// }
// }
- Object maxTaskParalismObject =
- mergeMap.get(Config.TOPOLOGY_MAX_TASK_PARALLELISM);
+ Object maxTaskParalismObject = mergeMap.get(Config.TOPOLOGY_MAX_TASK_PARALLELISM);
if (maxTaskParalismObject == null) {
return taskNum;
} else {
@@ -239,24 +198,19 @@ public class NimbusUtils {
* finalize component's task paralism
*
* @param topology
- * @param fromConf means if the paralism is read from conf file instead of
- * reading from topology code
+ * @param fromConf means if the paralism is read from conf file instead of reading from topology code
* @return
*/
- public static StormTopology normalizeTopology(Map stormConf,
- StormTopology topology, boolean fromConf) {
+ public static StormTopology normalizeTopology(Map stormConf, StormTopology topology, boolean fromConf) {
StormTopology ret = topology.deepCopy();
- Map<String, Object> rawComponents =
- ThriftTopologyUtils.getComponents(topology);
+ Map<String, Object> rawComponents = ThriftTopologyUtils.getComponents(topology);
Map<String, Object> components = ThriftTopologyUtils.getComponents(ret);
if (rawComponents.keySet().equals(components.keySet()) == false) {
- String errMsg =
- "Failed to normalize topology binary, maybe due to wrong dependency";
- LOG.info(errMsg + " raw components:" + rawComponents.keySet()
- + ", normalized " + components.keySet());
+ String errMsg = "Failed to normalize topology binary, maybe due to wrong dependency";
+ LOG.info(errMsg + " raw components:" + rawComponents.keySet() + ", normalized " + components.keySet());
throw new InvalidParameterException(errMsg);
}
@@ -269,9 +223,7 @@ public class NimbusUtils {
if (component instanceof Bolt) {
common = ((Bolt) component).get_common();
if (fromConf) {
- Integer paraNum =
- ConfigExtension.getBoltParallelism(stormConf,
- componentName);
+ Integer paraNum = ConfigExtension.getBoltParallelism(stormConf, componentName);
if (paraNum != null) {
LOG.info("Set " + componentName + " as " + paraNum);
common.set_parallelism_hint(paraNum);
@@ -281,9 +233,7 @@ public class NimbusUtils {
if (component instanceof SpoutSpec) {
common = ((SpoutSpec) component).get_common();
if (fromConf) {
- Integer paraNum =
- ConfigExtension.getSpoutParallelism(stormConf,
- componentName);
+ Integer paraNum = ConfigExtension.getSpoutParallelism(stormConf, componentName);
if (paraNum != null) {
LOG.info("Set " + componentName + " as " + paraNum);
common.set_parallelism_hint(paraNum);
@@ -293,9 +243,7 @@ public class NimbusUtils {
if (component instanceof StateSpoutSpec) {
common = ((StateSpoutSpec) component).get_common();
if (fromConf) {
- Integer paraNum =
- ConfigExtension.getSpoutParallelism(stormConf,
- componentName);
+ Integer paraNum = ConfigExtension.getSpoutParallelism(stormConf, componentName);
if (paraNum != null) {
LOG.info("Set " + componentName + " as " + paraNum);
common.set_parallelism_hint(paraNum);
@@ -307,8 +255,7 @@ public class NimbusUtils {
String jsonConfString = common.get_json_conf();
if (jsonConfString != null) {
- componentMap
- .putAll((Map) JStormUtils.from_json(jsonConfString));
+ componentMap.putAll((Map) JStormUtils.from_json(jsonConfString));
}
Integer taskNum = componentParalism(stormConf, common);
@@ -328,20 +275,16 @@ public class NimbusUtils {
* clean the topology which is in ZK but not in local dir
*
* @throws Exception
- *
*/
- public static void cleanupCorruptTopologies(NimbusData data)
- throws Exception {
+ public static void cleanupCorruptTopologies(NimbusData data) throws Exception {
StormClusterState stormClusterState = data.getStormClusterState();
// get /local-storm-dir/nimbus/stormdist path
- String master_stormdist_root =
- StormConfig.masterStormdistRoot(data.getConf());
+ String master_stormdist_root = StormConfig.masterStormdistRoot(data.getConf());
// listdir /local-storm-dir/nimbus/stormdist
- List<String> code_ids =
- PathUtils.read_dir_contents(master_stormdist_root);
+ List<String> code_ids = PathUtils.read_dir_contents(master_stormdist_root);
// get topology in ZK /storms
List<String> active_ids = data.getStormClusterState().active_storms();
@@ -352,9 +295,7 @@ public class NimbusUtils {
}
for (String corrupt : active_ids) {
- LOG.info("Corrupt topology "
- + corrupt
- + " has state on zookeeper but doesn't have a local dir on Nimbus. Cleaning up...");
+ LOG.info("Corrupt topology " + corrupt + " has state on zookeeper but doesn't have a local dir on Nimbus. Cleaning up...");
/**
* Just removing the /STORMS is enough
@@ -368,53 +309,47 @@ public class NimbusUtils {
}
- public static boolean isTaskDead(NimbusData data, String topologyId,
- Integer taskId) {
+ public static boolean isTaskDead(NimbusData data, String topologyId, Integer taskId) {
String idStr = " topology:" + topologyId + ",taskid:" + taskId;
- Integer zkReportTime = null;
+ TopologyTaskHbInfo topoTasksHbInfo = data.getTasksHeartbeat().get(topologyId);
+ Map<Integer, TaskHeartbeat> taskHbMap = null;
+ Integer taskReportTime = null;
- StormClusterState stormClusterState = data.getStormClusterState();
- TaskHeartbeat zkTaskHeartbeat = null;
- try {
- zkTaskHeartbeat =
- stormClusterState.task_heartbeat(topologyId, taskId);
- if (zkTaskHeartbeat != null) {
- zkReportTime = zkTaskHeartbeat.getTimeSecs();
+ if (topoTasksHbInfo != null) {
+ taskHbMap = topoTasksHbInfo.get_taskHbs();
+ if (taskHbMap != null) {
+ TaskHeartbeat tHb = taskHbMap.get(taskId);
+ taskReportTime = ((tHb != null ) ? tHb.get_time() : null);
}
- } catch (Exception e) {
- LOG.error("Failed to get ZK task hearbeat " + idStr, e);
}
- Map<Integer, TkHbCacheTime> taskHBs =
- data.getTaskHeartbeatsCache(topologyId, true);
+ Map<Integer, TkHbCacheTime> taskHBs = data.getTaskHeartbeatsCache(topologyId, true);
TkHbCacheTime taskHB = taskHBs.get(taskId);
if (taskHB == null) {
LOG.info("No task heartbeat cache " + idStr);
- if (zkTaskHeartbeat == null) {
- LOG.info("No ZK task hearbeat " + idStr);
+ if (topoTasksHbInfo == null || taskHbMap == null) {
+ LOG.info("No task hearbeat was reported for " + idStr);
return true;
}
taskHB = new TkHbCacheTime();
- taskHB.update(zkTaskHeartbeat);
+ taskHB.update(taskHbMap.get(taskId));
taskHBs.put(taskId, taskHB);
return false;
}
- if (zkReportTime == null) {
- LOG.debug("No ZK task heartbeat " + idStr);
+ if (taskReportTime == null || taskReportTime < taskHB.getTaskAssignedTime()) {
+ LOG.debug("No task heartbeat was reported for " + idStr);
// Task hasn't finish init
int nowSecs = TimeUtils.current_time_secs();
int assignSecs = taskHB.getTaskAssignedTime();
- int waitInitTimeout =
- JStormUtils.parseInt(data.getConf().get(
- Config.NIMBUS_TASK_LAUNCH_SECS));
+ int waitInitTimeout = JStormUtils.parseInt(data.getConf().get(Config.NIMBUS_TASK_LAUNCH_SECS));
if (nowSecs - assignSecs > waitInitTimeout) {
LOG.info(idStr + " failed to init ");
@@ -433,30 +368,29 @@ public class NimbusUtils {
int nowSecs = TimeUtils.current_time_secs();
if (nimbusTime == 0) {
// taskHB no entry, first time
- // update taskHB
+ // update taskHBtaskReportTime
taskHB.setNimbusTime(nowSecs);
- taskHB.setTaskReportedTime(zkReportTime);
+ taskHB.setTaskReportedTime(taskReportTime);
LOG.info("Update taskheartbeat to nimbus cache " + idStr);
return false;
}
- if (reportTime != zkReportTime.intValue()) {
+ if (reportTime != taskReportTime.intValue()) {
// zk has been updated the report time
taskHB.setNimbusTime(nowSecs);
- taskHB.setTaskReportedTime(zkReportTime);
+ taskHB.setTaskReportedTime(taskReportTime);
- LOG.debug(idStr + ",nimbusTime " + nowSecs + ",zkReport:"
- + zkReportTime + ",report:" + reportTime);
+ LOG.debug(idStr + ",nimbusTime " + nowSecs + ",zkReport:" + taskReportTime + ",report:" + reportTime);
return false;
}
// the following is (zkReportTime == reportTime)
Integer taskHBTimeout = data.getTopologyTaskTimeout().get(topologyId);
if (taskHBTimeout == null)
- taskHBTimeout =
- JStormUtils.parseInt(data.getConf().get(
- Config.NIMBUS_TASK_TIMEOUT_SECS));
+ taskHBTimeout = JStormUtils.parseInt(data.getConf().get(Config.NIMBUS_TASK_TIMEOUT_SECS));
+ if (taskId == topoTasksHbInfo.get_topologyMasterId())
+ taskHBTimeout = (taskHBTimeout / 2);
if (nowSecs - nimbusTime > taskHBTimeout) {
// task is dead
long ts = ((long) nimbusTime) * 1000;
@@ -470,7 +404,7 @@ public class NimbusUtils {
sb.append(",current ");
sb.append(nowSecs);
sb.append(":").append(new Date(((long) nowSecs) * 1000));
- LOG.info(sb.toString());
+ LOG.debug(sb.toString());
return true;
}
@@ -478,13 +412,10 @@ public class NimbusUtils {
}
- public static void updateTaskHbStartTime(NimbusData data,
- Assignment assignment, String topologyId) {
- Map<Integer, TkHbCacheTime> taskHBs =
- data.getTaskHeartbeatsCache(topologyId, true);
+ public static void updateTaskHbStartTime(NimbusData data, Assignment assignment, String topologyId) {
+ Map<Integer, TkHbCacheTime> taskHBs = data.getTaskHeartbeatsCache(topologyId, true);
- Map<Integer, Integer> taskStartTimes =
- assignment.getTaskStartTimeSecs();
+ Map<Integer, Integer> taskStartTimes = assignment.getTaskStartTimeSecs();
for (Entry<Integer, Integer> entry : taskStartTimes.entrySet()) {
Integer taskId = entry.getKey();
Integer taskStartTime = entry.getValue();
@@ -501,25 +432,19 @@ public class NimbusUtils {
return;
}
- public static <T> void transitionName(NimbusData data, String topologyName,
- boolean errorOnNoTransition, StatusType transition_status,
- T... args) throws Exception {
+ public static <T> void transitionName(NimbusData data, String topologyName, boolean errorOnNoTransition, StatusType transition_status, T... args)
+ throws Exception {
StormClusterState stormClusterState = data.getStormClusterState();
- String topologyId =
- Cluster.get_topology_id(stormClusterState, topologyName);
+ String topologyId = Cluster.get_topology_id(stormClusterState, topologyName);
if (topologyId == null) {
throw new NotAliveException(topologyName);
}
- transition(data, topologyId, errorOnNoTransition, transition_status,
- args);
+ transition(data, topologyId, errorOnNoTransition, transition_status, args);
}
- public static <T> void transition(NimbusData data, String topologyid,
- boolean errorOnNoTransition, StatusType transition_status,
- T... args) {
+ public static <T> void transition(NimbusData data, String topologyid, boolean errorOnNoTransition, StatusType transition_status, T... args) {
try {
- data.getStatusTransition().transition(topologyid,
- errorOnNoTransition, transition_status, args);
+ data.getStatusTransition().transition(topologyid, errorOnNoTransition, transition_status, args);
} catch (Exception e) {
// TODO Auto-generated catch block
LOG.error("Failed to do status transition,", e);
@@ -535,22 +460,17 @@ public class NimbusUtils {
return numTasks;
}
- public static List<TopologySummary> getTopologySummary(
- StormClusterState stormClusterState,
- Map<String, Assignment> assignments) throws Exception {
- List<TopologySummary> topologySummaries =
- new ArrayList<TopologySummary>();
+ public static List<TopologySummary> getTopologySummary(StormClusterState stormClusterState, Map<String, Assignment> assignments) throws Exception {
+ List<TopologySummary> topologySummaries = new ArrayList<TopologySummary>();
// get all active topology's StormBase
- Map<String, StormBase> bases =
- Cluster.get_all_StormBase(stormClusterState);
+ Map<String, StormBase> bases = Cluster.get_all_StormBase(stormClusterState);
for (Entry<String, StormBase> entry : bases.entrySet()) {
String topologyId = entry.getKey();
StormBase base = entry.getValue();
- Assignment assignment =
- stormClusterState.assignment_info(topologyId, null);
+ Assignment assignment = stormClusterState.assignment_info(topologyId, null);
if (assignment == null) {
LOG.error("Failed to get assignment of " + topologyId);
continue;
@@ -571,11 +491,10 @@ public class NimbusUtils {
topology.set_id(topologyId);
topology.set_name(base.getStormName());
topology.set_status(base.getStatusString());
- topology.set_uptime_secs(TimeUtils.time_delta(base
- .getLanchTimeSecs()));
- topology.set_num_workers(num_workers);
- topology.set_num_tasks(num_tasks);
- topology.set_error_info(errorString);
+ topology.set_uptimeSecs(TimeUtils.time_delta(base.getLanchTimeSecs()));
+ topology.set_numWorkers(num_workers);
+ topology.set_numTasks(num_tasks);
+ topology.set_errorInfo(errorString);
topologySummaries.add(topology);
@@ -584,34 +503,26 @@ public class NimbusUtils {
return topologySummaries;
}
- public static SupervisorSummary mkSupervisorSummary(
- SupervisorInfo supervisorInfo, String supervisorId,
- Map<String, Integer> supervisorToUsedSlotNum) {
+ public static SupervisorSummary mkSupervisorSummary(SupervisorInfo supervisorInfo, String supervisorId, Map<String, Integer> supervisorToUsedSlotNum) {
Integer usedNum = supervisorToUsedSlotNum.get(supervisorId);
SupervisorSummary summary =
- new SupervisorSummary(supervisorInfo.getHostName(),
- supervisorId, supervisorInfo.getUptimeSecs(),
- supervisorInfo.getWorkerPorts().size(),
+ new SupervisorSummary(supervisorInfo.getHostName(), supervisorId, supervisorInfo.getUptimeSecs(), supervisorInfo.getWorkerPorts().size(),
usedNum == null ? 0 : usedNum);
return summary;
}
- public static List<SupervisorSummary> mkSupervisorSummaries(
- Map<String, SupervisorInfo> supervisorInfos,
- Map<String, Assignment> assignments) {
+ public static List<SupervisorSummary> mkSupervisorSummaries(Map<String, SupervisorInfo> supervisorInfos, Map<String, Assignment> assignments) {
- Map<String, Integer> supervisorToLeftSlotNum =
- new HashMap<String, Integer>();
+ Map<String, Integer> supervisorToLeftSlotNum = new HashMap<String, Integer>();
for (Entry<String, Assignment> entry : assignments.entrySet()) {
Set<ResourceWorkerSlot> workers = entry.getValue().getWorkers();
for (ResourceWorkerSlot worker : workers) {
String supervisorId = worker.getNodeId();
- SupervisorInfo supervisorInfo =
- supervisorInfos.get(supervisorId);
+ SupervisorInfo supervisorInfo = supervisorInfos.get(supervisorId);
if (supervisorInfo == null) {
continue;
}
@@ -629,9 +540,7 @@ public class NimbusUtils {
String supervisorId = entry.getKey();
SupervisorInfo supervisorInfo = entry.getValue();
- SupervisorSummary summary =
- mkSupervisorSummary(supervisorInfo, supervisorId,
- supervisorToLeftSlotNum);
+ SupervisorSummary summary = mkSupervisorSummary(supervisorInfo, supervisorId, supervisorToLeftSlotNum);
ret.add(summary);
}
@@ -648,27 +557,24 @@ public class NimbusUtils {
return ret;
}
- public static NimbusSummary getNimbusSummary(
- StormClusterState stormClusterState,
- List<SupervisorSummary> supervisorSummaries, NimbusData data)
+ public static NimbusSummary getNimbusSummary(StormClusterState stormClusterState, List<SupervisorSummary> supervisorSummaries, NimbusData data)
throws Exception {
NimbusSummary ret = new NimbusSummary();
String master = stormClusterState.get_leader_host();
NimbusStat nimbusMaster = new NimbusStat();
nimbusMaster.set_host(master);
- nimbusMaster.set_uptime_secs(String.valueOf(data.uptime()));
- ret.set_nimbus_master(nimbusMaster);
+ nimbusMaster.set_uptimeSecs(String.valueOf(data.uptime()));
+ ret.set_nimbusMaster(nimbusMaster);
List<NimbusStat> nimbusSlaveList = new ArrayList<NimbusStat>();
- ret.set_nimbus_slaves(nimbusSlaveList);
- Map<String, String> nimbusSlaveMap =
- Cluster.get_all_nimbus_slave(stormClusterState);
+ ret.set_nimbusSlaves(nimbusSlaveList);
+ Map<String, String> nimbusSlaveMap = Cluster.get_all_nimbus_slave(stormClusterState);
if (nimbusSlaveMap != null) {
for (Entry<String, String> entry : nimbusSlaveMap.entrySet()) {
NimbusStat slave = new NimbusStat();
slave.set_host(entry.getKey());
- slave.set_uptime_secs(entry.getValue());
+ slave.set_uptimeSecs(entry.getValue());
nimbusSlaveList.add(slave);
}
@@ -678,46 +584,75 @@ public class NimbusUtils {
int usedPort = 0;
for (SupervisorSummary supervisor : supervisorSummaries) {
- totalPort += supervisor.get_num_workers();
- usedPort += supervisor.get_num_used_workers();
+ totalPort += supervisor.get_numWorkers();
+ usedPort += supervisor.get_numUsedWorkers();
}
- ret.set_supervisor_num(supervisorSummaries.size());
- ret.set_total_port_num(totalPort);
- ret.set_used_port_num(usedPort);
- ret.set_free_port_num(totalPort - usedPort);
+ ret.set_supervisorNum(supervisorSummaries.size());
+ ret.set_totalPortNum(totalPort);
+ ret.set_usedPortNum(usedPort);
+ ret.set_freePortNum(totalPort - usedPort);
ret.set_version(Utils.getVersion());
return ret;
}
- public static void updateTopologyTaskTimeout(NimbusData data,
- String topologyId) {
+ public static void updateTopologyTaskTimeout(NimbusData data, String topologyId) {
Map topologyConf = null;
try {
- topologyConf =
- StormConfig.read_nimbus_topology_conf(data.getConf(),
- topologyId);
+ topologyConf = StormConfig.read_nimbus_topology_conf(data.getConf(), topologyId);
} catch (IOException e) {
- LOG.warn("Failed to read configuration of " + topologyId + ", "
- + e.getMessage());
+ LOG.warn("Failed to read configuration of " + topologyId + ", " + e.getMessage());
}
- Integer timeout =
- JStormUtils.parseInt(topologyConf
- .get(Config.NIMBUS_TASK_TIMEOUT_SECS));
+ Integer timeout = JStormUtils.parseInt(topologyConf.get(Config.NIMBUS_TASK_TIMEOUT_SECS));
if (timeout == null) {
- timeout =
- JStormUtils.parseInt(data.getConf().get(
- Config.NIMBUS_TASK_TIMEOUT_SECS));
+ timeout = JStormUtils.parseInt(data.getConf().get(Config.NIMBUS_TASK_TIMEOUT_SECS));
}
LOG.info("Setting taskTimeout:" + timeout + " for " + topologyId);
data.getTopologyTaskTimeout().put(topologyId, timeout);
}
- public static void removeTopologyTaskTimeout(NimbusData data,
- String topologyId) {
+ public static void removeTopologyTaskTimeout(NimbusData data, String topologyId) {
data.getTopologyTaskTimeout().remove(topologyId);
}
+
+ public static void updateTopologyTaskHb(NimbusData data, String topologyId) {
+ StormClusterState clusterState = data.getStormClusterState();
+ TopologyTaskHbInfo topologyTaskHb = null;
+
+ try {
+ topologyTaskHb = clusterState.topology_heartbeat(topologyId);
+ } catch (Exception e) {
+ LOG.error("updateTopologyTaskHb: Failed to get topology task heartbeat info", e);
+ }
+
+ if (topologyTaskHb != null) {
+ data.getTasksHeartbeat().put(topologyId, topologyTaskHb);
+ }
+ }
+
+ public static void removeTopologyTaskHb(NimbusData data, String topologyId, int taskId) {
+ TopologyTaskHbInfo topologyTaskHbs = data.getTasksHeartbeat().get(topologyId);
+
+ if (topologyTaskHbs != null) {
+ Map<Integer, TaskHeartbeat> taskHbs = topologyTaskHbs.get_taskHbs();
+ if (taskHbs != null) {
+ taskHbs.remove(taskId);
+ }
+ }
+ }
+
+ public static int getTopologyMasterId(Map<Integer, TaskInfo> tasksInfo) {
+ int ret = 0;
+ for (Entry<Integer, TaskInfo> entry : tasksInfo.entrySet()) {
+ if (entry.getValue().getComponentId().equalsIgnoreCase(Common.TOPOLOGY_MASTER_COMPONENT_ID)) {
+ ret = entry.getKey();
+ break;
+ }
+ }
+
+ return ret;
+ }
}