You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by pt...@apache.org on 2015/12/01 23:04:48 UTC
[10/51] [partial] storm git commit: Update JStorm to latest release
2.1.0
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/message/netty/NettyClient.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/message/netty/NettyClient.java b/jstorm-core/src/main/java/com/alibaba/jstorm/message/netty/NettyClient.java
index 6b79fb4..45c353c 100644
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/message/netty/NettyClient.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/message/netty/NettyClient.java
@@ -17,6 +17,26 @@
*/
package com.alibaba.jstorm.message.netty;
+import backtype.storm.Config;
+import backtype.storm.messaging.IConnection;
+import backtype.storm.messaging.TaskMessage;
+import backtype.storm.utils.DisruptorQueue;
+import backtype.storm.utils.Utils;
+import com.alibaba.jstorm.client.ConfigExtension;
+import com.alibaba.jstorm.common.metric.*;
+import com.alibaba.jstorm.metric.*;
+import com.alibaba.jstorm.utils.JStormServerUtils;
+import com.alibaba.jstorm.utils.JStormUtils;
+import com.alibaba.jstorm.utils.NetWorkUtils;
+import com.codahale.metrics.health.HealthCheck;
+import org.jboss.netty.bootstrap.ClientBootstrap;
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelFactory;
+import org.jboss.netty.channel.ChannelFuture;
+import org.jboss.netty.channel.ChannelFutureListener;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.HashSet;
@@ -29,35 +49,9 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
-import org.jboss.netty.bootstrap.ClientBootstrap;
-import org.jboss.netty.channel.Channel;
-import org.jboss.netty.channel.ChannelFactory;
-import org.jboss.netty.channel.ChannelFuture;
-import org.jboss.netty.channel.ChannelFutureListener;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import backtype.storm.Config;
-import backtype.storm.messaging.IConnection;
-import backtype.storm.messaging.TaskMessage;
-import backtype.storm.utils.DisruptorQueue;
-import backtype.storm.utils.Utils;
-
-import com.alibaba.jstorm.client.ConfigExtension;
-import com.alibaba.jstorm.common.metric.Histogram;
-import com.alibaba.jstorm.common.metric.Meter;
-import com.alibaba.jstorm.common.metric.QueueGauge;
-import com.alibaba.jstorm.metric.JStormHealthCheck;
-import com.alibaba.jstorm.metric.JStormMetrics;
-import com.alibaba.jstorm.metric.MetricDef;
-import com.alibaba.jstorm.utils.JStormServerUtils;
-import com.alibaba.jstorm.utils.JStormUtils;
-import com.alibaba.jstorm.utils.NetWorkUtils;
-import com.codahale.metrics.health.HealthCheck;
-
class NettyClient implements IConnection {
- private static final Logger LOG = LoggerFactory
- .getLogger(NettyClient.class);
+ private static final Logger LOG = LoggerFactory.getLogger(NettyClient.class);
+
protected String name;
protected final int max_retries;
@@ -84,9 +78,11 @@ class NettyClient implements IConnection {
protected String address;
// doesn't use timer, due to competition
- protected Histogram sendTimer;
- protected Histogram batchSizeHistogram;
- protected Meter sendSpeed;
+ protected AsmHistogram sendTimer;
+ protected AsmHistogram batchSizeHistogram;
+ protected AsmMeter sendSpeed;
+ protected static AsmMeter totalSendSpeed = (AsmMeter) JStormMetrics.registerWorkerMetric(MetricUtils.workerMetricName(
+ MetricDef.NETTY_CLI_SEND_SPEED, MetricType.METER), new AsmMeter());
protected ReconnectRunnable reconnector;
protected ChannelFactory clientChannelFactory;
@@ -94,19 +90,19 @@ class NettyClient implements IConnection {
protected Set<Channel> closingChannel;
protected AtomicBoolean isConnecting = new AtomicBoolean(false);
-
+
protected NettyConnection nettyConnection;
-
+
protected Map stormConf;
-
+
protected boolean connectMyself;
protected Object channelClosing = new Object();
+ protected boolean enableNettyMetrics;
+
@SuppressWarnings("rawtypes")
- NettyClient(Map storm_conf, ChannelFactory factory,
- ScheduledExecutorService scheduler, String host, int port,
- ReconnectRunnable reconnector) {
+ NettyClient(Map storm_conf, ChannelFactory factory, ScheduledExecutorService scheduler, String host, int port, ReconnectRunnable reconnector) {
this.stormConf = storm_conf;
this.factory = factory;
this.scheduler = scheduler;
@@ -116,34 +112,21 @@ class NettyClient implements IConnection {
channelRef = new AtomicReference<Channel>(null);
being_closed = new AtomicBoolean(false);
pendings = new AtomicLong(0);
-
+
nettyConnection = new NettyConnection();
- nettyConnection.setClientPort(NetWorkUtils.ip(),
- ConfigExtension.getLocalWorkerPort(storm_conf));
+ nettyConnection.setClientPort(NetWorkUtils.ip(), ConfigExtension.getLocalWorkerPort(storm_conf));
nettyConnection.setServerPort(host, port);
// Configure
- buffer_size =
- Utils.getInt(storm_conf
- .get(Config.STORM_MESSAGING_NETTY_BUFFER_SIZE));
- max_retries =
- Math.min(30, Utils.getInt(storm_conf
- .get(Config.STORM_MESSAGING_NETTY_MAX_RETRIES)));
- base_sleep_ms =
- Utils.getInt(storm_conf
- .get(Config.STORM_MESSAGING_NETTY_MIN_SLEEP_MS));
- max_sleep_ms =
- Utils.getInt(storm_conf
- .get(Config.STORM_MESSAGING_NETTY_MAX_SLEEP_MS));
+ buffer_size = Utils.getInt(storm_conf.get(Config.STORM_MESSAGING_NETTY_BUFFER_SIZE));
+ max_retries = Math.min(30, Utils.getInt(storm_conf.get(Config.STORM_MESSAGING_NETTY_MAX_RETRIES)));
+ base_sleep_ms = Utils.getInt(storm_conf.get(Config.STORM_MESSAGING_NETTY_MIN_SLEEP_MS));
+ max_sleep_ms = Utils.getInt(storm_conf.get(Config.STORM_MESSAGING_NETTY_MAX_SLEEP_MS));
timeoutMs = ConfigExtension.getNettyPendingBufferTimeout(storm_conf);
- MAX_SEND_PENDING =
- (int) ConfigExtension.getNettyMaxSendPending(storm_conf);
+ MAX_SEND_PENDING = (int) ConfigExtension.getNettyMaxSendPending(storm_conf);
- this.messageBatchSize =
- Utils.getInt(
- storm_conf.get(Config.STORM_NETTY_MESSAGE_BATCH_SIZE),
- 262144);
+ this.messageBatchSize = Utils.getInt(storm_conf.get(Config.STORM_NETTY_MESSAGE_BATCH_SIZE), 262144);
messageBatchRef = new AtomicReference<MessageBatch>();
// Start the connection attempt.
@@ -152,56 +135,62 @@ class NettyClient implements IConnection {
connectMyself = isConnectMyself(stormConf, host, port);
address = JStormServerUtils.getName(host, port);
-
- if (connectMyself == false) {
+
+ this.enableNettyMetrics = MetricUtils.isEnableNettyMetrics(storm_conf);
+ LOG.info("** enable netty metrics: {}", this.enableNettyMetrics);
+ if (!connectMyself) {
registerMetrics();
}
closingChannel = new HashSet<Channel>();
}
-
+
public void registerMetrics() {
- sendTimer =
- JStormMetrics.registerWorkerHistogram(
- MetricDef.NETTY_CLI_SEND_TIME, nettyConnection.toString());
- batchSizeHistogram =
- JStormMetrics.registerWorkerHistogram(
- MetricDef.NETTY_CLI_BATCH_SIZE, nettyConnection.toString());
- sendSpeed = JStormMetrics.registerWorkerMeter(MetricDef.NETTY_CLI_SEND_SPEED,
- nettyConnection.toString());
-
- CacheGaugeHealthCheck cacheGauge =
- new CacheGaugeHealthCheck(messageBatchRef,
- MetricDef.NETTY_CLI_CACHE_SIZE + ":" + nettyConnection.toString());
- JStormMetrics.registerWorkerGauge(cacheGauge,
- MetricDef.NETTY_CLI_CACHE_SIZE, nettyConnection.toString());
- JStormHealthCheck.registerWorkerHealthCheck(
- MetricDef.NETTY_CLI_CACHE_SIZE + ":" + nettyConnection.toString(), cacheGauge);
-
- JStormMetrics.registerWorkerGauge(
- new com.codahale.metrics.Gauge<Double>() {
+ if (this.enableNettyMetrics) {
+ sendTimer = (AsmHistogram) JStormMetrics.registerNettyMetric(
+ MetricUtils.nettyMetricName(AsmMetric.mkName(MetricDef.NETTY_CLI_SEND_TIME, nettyConnection),
+ MetricType.HISTOGRAM),
+ new AsmHistogram());
+ batchSizeHistogram = (AsmHistogram) JStormMetrics.registerNettyMetric(
+ MetricUtils.nettyMetricName(AsmMetric.mkName(MetricDef.NETTY_CLI_BATCH_SIZE, nettyConnection),
+ MetricType.HISTOGRAM),
+ new AsmHistogram());
+ sendSpeed = (AsmMeter) JStormMetrics.registerNettyMetric(MetricUtils.nettyMetricName(
+ AsmMetric.mkName(MetricDef.NETTY_CLI_SEND_SPEED, nettyConnection), MetricType.METER), new AsmMeter());
+
+ CacheGaugeHealthCheck cacheGauge = new CacheGaugeHealthCheck(messageBatchRef,
+ MetricDef.NETTY_CLI_CACHE_SIZE + ":" + nettyConnection.toString());
+ JStormMetrics.registerNettyMetric(MetricUtils
+ .nettyMetricName(AsmMetric.mkName(MetricDef.NETTY_CLI_CACHE_SIZE, nettyConnection), MetricType.GAUGE),
+ new AsmGauge(cacheGauge));
+
+ JStormMetrics.registerNettyMetric(MetricUtils
+ .nettyMetricName(AsmMetric.mkName(MetricDef.NETTY_CLI_SEND_PENDING, nettyConnection), MetricType.GAUGE),
+ new AsmGauge(new com.codahale.metrics.Gauge<Double>() {
+ @Override
+ public Double getValue() {
+ return ((Long) pendings.get()).doubleValue();
+ }
+ }));
- @Override
- public Double getValue() {
- return ((Long) pendings.get()).doubleValue();
- }
- }, MetricDef.NETTY_CLI_SEND_PENDING, nettyConnection.toString());
-
- JStormHealthCheck.registerWorkerHealthCheck(
- MetricDef.NETTY_CLI_CONNECTION + ":" + nettyConnection.toString(),
+ JStormHealthCheck.registerWorkerHealthCheck(MetricDef.NETTY_CLI_CACHE_SIZE + ":" + nettyConnection.toString(),
+ cacheGauge);
+ }
+
+ JStormHealthCheck.registerWorkerHealthCheck(MetricDef.NETTY_CLI_CONNECTION + ":" + nettyConnection.toString(),
new HealthCheck() {
- HealthCheck.Result healthy = HealthCheck.Result.healthy();
- HealthCheck.Result unhealthy = HealthCheck.Result.unhealthy
- ("NettyConnection " + nettyConnection.toString() + " is broken.");
+ Result healthy = Result.healthy();
+ Result unhealthy = Result
+ .unhealthy("NettyConnection " + nettyConnection.toString() + " is broken.");
+
@Override
protected Result check() throws Exception {
- // TODO Auto-generated method stub
if (isChannelReady() == null) {
return unhealthy;
- }else {
+ } else {
return healthy;
}
}
-
+
});
}
@@ -216,23 +205,28 @@ class NettyClient implements IConnection {
bootstrap.setPipelineFactory(new StormClientPipelineFactory(this, stormConf));
reconnect();
}
-
+
public boolean isConnectMyself(Map conf, String host, int port) {
String localIp = NetWorkUtils.ip();
String remoteIp = NetWorkUtils.host2Ip(host);
int localPort = ConfigExtension.getLocalWorkerPort(conf);
-
- if (localPort == port &&
- localIp.equals(remoteIp)) {
+
+ if (localPort == port && localIp.equals(remoteIp)) {
return true;
}
-
+
return false;
}
+ public void notifyInterestChanged(Channel channel) {
+ if (channel.isWritable()) {
+ MessageBatch messageBatch = messageBatchRef.getAndSet(null);
+ flushRequest(channel, messageBatch);
+ }
+ }
+
/**
* The function can't be synchronized, otherwise it will be deadlock
- *
*/
public void doReconnect() {
if (channelRef.get() != null) {
@@ -255,12 +249,10 @@ class NettyClient implements IConnection {
}
long sleepMs = getSleepTimeMs();
- LOG.info("Reconnect ... [{}], {}, sleep {}ms", retries.get(), name,
- sleepMs);
+ LOG.info("Reconnect ... [{}], {}, sleep {}ms", retries.get(), name, sleepMs);
ChannelFuture future = bootstrap.connect(remote_addr);
future.addListener(new ChannelFutureListener() {
- public void operationComplete(ChannelFuture future)
- throws Exception {
+ public void operationComplete(ChannelFuture future) throws Exception {
isConnecting.set(false);
Channel channel = future.getChannel();
if (future.isSuccess()) {
@@ -269,17 +261,12 @@ class NettyClient implements IConnection {
setChannel(channel);
// handleResponse();
} else {
- LOG.info(
- "Failed to reconnect ... [{}], {}, channel = {}, cause = {}",
- retries.get(), name, channel, future.getCause());
+ LOG.info("Failed to reconnect ... [{}], {}, channel = {}, cause = {}", retries.get(), name, channel, future.getCause());
reconnect();
}
}
});
JStormUtils.sleepMs(sleepMs);
-
- return;
-
}
public void reconnect() {
@@ -290,7 +277,6 @@ class NettyClient implements IConnection {
* # of milliseconds to wait per exponential back-off policy
*/
private int getSleepTimeMs() {
-
int sleepMs = base_sleep_ms * retries.incrementAndGet();
if (sleepMs > 1000) {
sleepMs = 1000;
@@ -310,7 +296,7 @@ class NettyClient implements IConnection {
public void send(TaskMessage message) {
LOG.warn("Should be overload");
}
-
+
Channel isChannelReady() {
Channel channel = channelRef.get();
if (channel == null) {
@@ -325,26 +311,28 @@ class NettyClient implements IConnection {
return channel;
}
- protected synchronized void flushRequest(Channel channel,
- final MessageBatch requests) {
+ protected synchronized void flushRequest(Channel channel, final MessageBatch requests) {
if (requests == null || requests.isEmpty())
return;
- Double batchSize = Double.valueOf(requests.getEncoded_length());
- batchSizeHistogram.update(batchSize);
+ Long batchSize = (long) requests.getEncoded_length();
+ if (batchSizeHistogram != null) {
+ batchSizeHistogram.update(batchSize);
+ }
pendings.incrementAndGet();
- sendSpeed.update(batchSize);
+ if (sendSpeed != null) {
+ sendSpeed.update(batchSize);
+ }
+ totalSendSpeed.update(batchSize);
ChannelFuture future = channel.write(requests);
future.addListener(new ChannelFutureListener() {
- public void operationComplete(ChannelFuture future)
- throws Exception {
+ public void operationComplete(ChannelFuture future) throws Exception {
pendings.decrementAndGet();
if (!future.isSuccess()) {
Channel channel = future.getChannel();
if (isClosed() == false) {
- LOG.info("Failed to send requests to " + name + ": "
- + channel.toString() + ":", future.getCause());
+ LOG.info("Failed to send requests to " + name + ": " + channel.toString() + ":", future.getCause());
}
if (null != channel) {
@@ -357,32 +345,29 @@ class NettyClient implements IConnection {
}
});
}
-
+
public void unregisterMetrics() {
- JStormMetrics.unregisterWorkerMetric(MetricDef.NETTY_CLI_SEND_TIME,
- nettyConnection.toString());
- JStormMetrics.unregisterWorkerMetric(MetricDef.NETTY_CLI_BATCH_SIZE,
- nettyConnection.toString());
- JStormMetrics.unregisterWorkerMetric(MetricDef.NETTY_CLI_SEND_PENDING,
- nettyConnection.toString());
- JStormMetrics.unregisterWorkerMetric(MetricDef.NETTY_CLI_CACHE_SIZE,
- nettyConnection.toString());
- JStormMetrics.unregisterWorkerMetric(MetricDef.NETTY_CLI_SEND_SPEED,
- nettyConnection.toString());
-
- JStormHealthCheck
- .unregisterWorkerHealthCheck(MetricDef.NETTY_CLI_CACHE_SIZE
- + ":" + nettyConnection.toString());
-
- JStormHealthCheck.unregisterWorkerHealthCheck(
- MetricDef.NETTY_CLI_CONNECTION + ":" + nettyConnection.toString());
+ if (this.enableNettyMetrics) {
+ JStormMetrics.unregisterNettyMetric(MetricUtils.nettyMetricName(
+ AsmMetric.mkName(MetricDef.NETTY_CLI_SEND_TIME, nettyConnection), MetricType.HISTOGRAM));
+ JStormMetrics.unregisterNettyMetric(MetricUtils.nettyMetricName(
+ AsmMetric.mkName(MetricDef.NETTY_CLI_BATCH_SIZE, nettyConnection), MetricType.HISTOGRAM));
+ JStormMetrics.unregisterNettyMetric(MetricUtils.nettyMetricName(
+ AsmMetric.mkName(MetricDef.NETTY_CLI_SEND_PENDING, nettyConnection), MetricType.GAUGE));
+ JStormMetrics.unregisterNettyMetric(MetricUtils
+ .nettyMetricName(AsmMetric.mkName(MetricDef.NETTY_CLI_CACHE_SIZE, nettyConnection), MetricType.GAUGE));
+ JStormMetrics.unregisterNettyMetric(MetricUtils
+ .nettyMetricName(AsmMetric.mkName(MetricDef.NETTY_CLI_SEND_SPEED, nettyConnection), MetricType.METER));
+ }
+ JStormHealthCheck.unregisterWorkerHealthCheck(MetricDef.NETTY_CLI_CACHE_SIZE + ":" + nettyConnection.toString());
+
+ JStormHealthCheck.unregisterWorkerHealthCheck(MetricDef.NETTY_CLI_CONNECTION + ":" + nettyConnection.toString());
}
/**
* gracefully close this client.
- *
- * We will send all existing requests, and then invoke close_n_release()
- * method
+ * <p/>
+ * We will send all existing requests, and then invoke close_n_release() method
*/
public void close() {
LOG.info("Close netty connection to {}", name());
@@ -391,7 +376,7 @@ class NettyClient implements IConnection {
return;
}
- if (connectMyself == false) {
+ if (!connectMyself) {
unregisterMetrics();
}
@@ -410,17 +395,13 @@ class NettyClient implements IConnection {
final long timeoutMilliSeconds = 10 * 1000;
final long start = System.currentTimeMillis();
- LOG.info("Waiting for pending batchs to be sent with " + name()
- + "..., timeout: {}ms, pendings: {}", timeoutMilliSeconds,
- pendings.get());
+ LOG.info("Waiting for pending batchs to be sent with " + name() + "..., timeout: {}ms, pendings: {}", timeoutMilliSeconds, pendings.get());
while (pendings.get() != 0) {
try {
long delta = System.currentTimeMillis() - start;
if (delta > timeoutMilliSeconds) {
- LOG.error(
- "Timeout when sending pending batchs with {}..., there are still {} pending batchs not sent",
- name(), pendings.get());
+ LOG.error("Timeout when sending pending batchs with {}..., there are still {} pending batchs not sent", name(), pendings.get());
break;
}
Thread.sleep(1000); // sleep 1s
@@ -445,7 +426,7 @@ class NettyClient implements IConnection {
/**
* Avoid channel double close
- *
+ *
* @param channel
*/
void closeChannel(final Channel channel) {
@@ -461,8 +442,7 @@ class NettyClient implements IConnection {
LOG.debug(channel.toString() + " begin to closed");
ChannelFuture closeFuture = channel.close();
closeFuture.addListener(new ChannelFutureListener() {
- public void operationComplete(ChannelFuture future)
- throws Exception {
+ public void operationComplete(ChannelFuture future) throws Exception {
synchronized (channelClosing) {
closingChannel.remove(channel);
@@ -501,14 +481,9 @@ class NettyClient implements IConnection {
retries.set(0);
}
- final String oldLocalAddres =
- (oldChannel == null) ? "null" : oldChannel.getLocalAddress()
- .toString();
- String newLocalAddress =
- (newChannel == null) ? "null" : newChannel.getLocalAddress()
- .toString();
- LOG.info("Use new channel {} replace old channel {}", newLocalAddress,
- oldLocalAddres);
+ final String oldLocalAddres = (oldChannel == null) ? "null" : oldChannel.getLocalAddress().toString();
+ String newLocalAddress = (newChannel == null) ? "null" : newChannel.getLocalAddress().toString();
+ LOG.info("Use new channel {} replace old channel {}", newLocalAddress, oldLocalAddres);
// avoid one netty client use too much connection, close old one
if (oldChannel != newChannel && oldChannel != null) {
@@ -555,60 +530,56 @@ class NettyClient implements IConnection {
@Override
public Object recv(Integer taskId, int flags) {
- throw new UnsupportedOperationException(
- "recvTask: Client connection should not receive any messages");
+ throw new UnsupportedOperationException("recvTask: Client connection should not receive any messages");
}
@Override
public void registerQueue(Integer taskId, DisruptorQueue recvQueu) {
- throw new UnsupportedOperationException(
- "recvTask: Client connection should not receive any messages");
+ throw new UnsupportedOperationException("recvTask: Client connection should not receive any messages");
}
@Override
public void enqueue(TaskMessage message) {
- throw new UnsupportedOperationException(
- "recvTask: Client connection should not receive any messages");
+ throw new UnsupportedOperationException("recvTask: Client connection should not receive any messages");
}
- public static class CacheGaugeHealthCheck extends HealthCheck implements
- com.codahale.metrics.Gauge<Double> {
+ public static class CacheGaugeHealthCheck extends HealthCheck implements com.codahale.metrics.Gauge<Double> {
AtomicReference<MessageBatch> messageBatchRef;
String name;
Result healthy;
- public CacheGaugeHealthCheck(
- AtomicReference<MessageBatch> messageBatchRef, String name) {
+ public CacheGaugeHealthCheck(AtomicReference<MessageBatch> messageBatchRef, String name) {
this.messageBatchRef = messageBatchRef;
this.name = name;
- this.healthy = HealthCheck.Result.healthy();
+ this.healthy = Result.healthy();
}
@Override
public Double getValue() {
- // TODO Auto-generated method stub
MessageBatch messageBatch = messageBatchRef.get();
if (messageBatch == null) {
return 0.0;
} else {
- Double ret = (double) messageBatch.getEncoded_length();
- return ret;
+ return (double) messageBatch.getEncoded_length();
}
}
@Override
protected Result check() throws Exception {
- // TODO Auto-generated method stub
Double size = getValue();
if (size > 8 * JStormUtils.SIZE_1_M) {
- return HealthCheck.Result.unhealthy(name
- + QueueGauge.QUEUE_IS_FULL);
+ return Result.unhealthy(name + QueueGauge.QUEUE_IS_FULL);
} else {
return healthy;
}
}
}
+
+ @Override
+ public boolean available() {
+ return (isChannelReady() != null);
+ }
}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/message/netty/NettyClientAsync.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/message/netty/NettyClientAsync.java b/jstorm-core/src/main/java/com/alibaba/jstorm/message/netty/NettyClientAsync.java
index 1d582ba..9b58063 100644
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/message/netty/NettyClientAsync.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/message/netty/NettyClientAsync.java
@@ -17,31 +17,29 @@
*/
package com.alibaba.jstorm.message.netty;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import org.apache.commons.lang.builder.ToStringBuilder;
-import org.apache.commons.lang.builder.ToStringStyle;
-import org.jboss.netty.channel.Channel;
-import org.jboss.netty.channel.ChannelFactory;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
import backtype.storm.Config;
import backtype.storm.messaging.TaskMessage;
import backtype.storm.utils.Utils;
-
import com.alibaba.jstorm.client.ConfigExtension;
import com.alibaba.jstorm.utils.IntervalCheck;
import com.alibaba.jstorm.utils.JStormServerUtils;
import com.alibaba.jstorm.utils.JStormUtils;
+import com.alibaba.jstorm.utils.TimeUtils;
+import org.apache.commons.lang.builder.ToStringBuilder;
+import org.apache.commons.lang.builder.ToStringStyle;
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
class NettyClientAsync extends NettyClient {
- private static final Logger LOG = LoggerFactory
- .getLogger(NettyClientAsync.class);
+ private static final Logger LOG = LoggerFactory.getLogger(NettyClientAsync.class);
public static final String PREFIX = "Netty-Client-";
// when batch buffer size is more than BATCH_THREASHOLD_WARN
@@ -54,7 +52,6 @@ class NettyClientAsync extends NettyClient {
protected final boolean blockSend;
boolean isDirectSend(Map conf) {
-
if (JStormServerUtils.isOnePending(conf) == true) {
return true;
}
@@ -71,22 +68,15 @@ class NettyClientAsync extends NettyClient {
}
@SuppressWarnings("rawtypes")
- NettyClientAsync(Map storm_conf, ChannelFactory factory,
- ScheduledExecutorService scheduler, String host, int port,
- ReconnectRunnable reconnector) {
+ NettyClientAsync(Map storm_conf, ChannelFactory factory, ScheduledExecutorService scheduler, String host, int port, ReconnectRunnable reconnector) {
super(storm_conf, factory, scheduler, host, port, reconnector);
- BATCH_THREASHOLD_WARN =
- ConfigExtension.getNettyBufferThresholdSize(storm_conf);
-
+ BATCH_THREASHOLD_WARN = ConfigExtension.getNettyBufferThresholdSize(storm_conf);
blockSend = isBlockSend(storm_conf);
-
directlySend = isDirectSend(storm_conf);
flush_later = new AtomicBoolean(false);
- flushCheckInterval =
- Utils.getInt(storm_conf
- .get(Config.STORM_NETTY_FLUSH_CHECK_INTERVAL_MS), 10);
+ flushCheckInterval = Utils.getInt(storm_conf.get(Config.STORM_NETTY_FLUSH_CHECK_INTERVAL_MS), 10);
Runnable flusher = new Runnable() {
@Override
@@ -95,13 +85,11 @@ class NettyClientAsync extends NettyClient {
}
};
long initialDelay = Math.min(1000, max_sleep_ms * max_retries);
- scheduler.scheduleAtFixedRate(flusher, initialDelay,
- flushCheckInterval, TimeUnit.MILLISECONDS);
+ scheduler.scheduleAtFixedRate(flusher, initialDelay, flushCheckInterval, TimeUnit.MILLISECONDS);
clientChannelFactory = factory;
start();
-
LOG.info(this.toString());
}
@@ -109,7 +97,7 @@ class NettyClientAsync extends NettyClient {
* Enqueue a task message to be sent to server
*/
@Override
- synchronized public void send(List<TaskMessage> messages) {
+ public synchronized void send(List<TaskMessage> messages) {
// throw exception if the client is being closed
if (isClosed()) {
LOG.warn("Client is being closed, and does not take requests any more");
@@ -123,13 +111,14 @@ class NettyClientAsync extends NettyClient {
throw new RuntimeException(e);
} finally {
long end = System.nanoTime();
- sendTimer.update((end - start)/1000000.0d);
-
+ if (sendTimer != null) {
+ sendTimer.update((end - start) / TimeUtils.NS_PER_US);
+ }
}
}
@Override
- synchronized public void send(TaskMessage message) {
+ public synchronized void send(TaskMessage message) {
// throw exception if the client is being closed
if (isClosed()) {
LOG.warn("Client is being closed, and does not take requests any more");
@@ -143,7 +132,9 @@ class NettyClientAsync extends NettyClient {
throw new RuntimeException(e);
} finally {
long end = System.nanoTime();
- sendTimer.update((end - start)/1000000.0d);
+ if (sendTimer != null) {
+ sendTimer.update((end - start) / TimeUtils.NS_PER_US);
+ }
}
}
@@ -159,21 +150,17 @@ class NettyClientAsync extends NettyClient {
long now = System.currentTimeMillis();
long delt = now - begin;
if (oneSecond.check() == true) {
- LOG.warn(
- "Target server {} is unavailable, pending {}, bufferSize {}, block sending {}ms",
- name, pendings.get(), cachedSize, delt);
+ LOG.warn("Target server {} is unavailable, pending {}, bufferSize {}, block sending {}ms", name, pendings.get(), cachedSize, delt);
}
if (timeoutIntervalCheck.check() == true) {
if (messageBatchRef.get() != null) {
- LOG.warn(
- "Target server {} is unavailable, wait too much time, throw timeout message",
- name);
+ LOG.warn("Target server {} is unavailable, wait too much time, throw timeout message", name);
messageBatchRef.set(null);
}
setChannel(null);
LOG.warn("Reset channel as null");
-
+
if (blockSend == false) {
reconnect();
break;
@@ -184,12 +171,10 @@ class NettyClientAsync extends NettyClient {
JStormUtils.sleepMs(sleepMs);
if (delt > 2 * timeoutMs * 1000L && changeThreadhold == false) {
- if (channelRef.get() != null
- && BATCH_THREASHOLD_WARN >= 2 * messageBatchSize) {
+ if (channelRef.get() != null && BATCH_THREASHOLD_WARN >= 2 * messageBatchSize) {
// it is just channel isn't writable;
BATCH_THREASHOLD_WARN = BATCH_THREASHOLD_WARN / 2;
- LOG.info("Reduce BATCH_THREASHOLD_WARN to {}",
- BATCH_THREASHOLD_WARN);
+ LOG.info("Reduce BATCH_THREASHOLD_WARN to {}", BATCH_THREASHOLD_WARN);
changeThreadhold = true;
}
@@ -296,12 +281,9 @@ class NettyClientAsync extends NettyClient {
} else {
if (messageBatchRef.compareAndSet(null, messageBatch)) {
flush_later.set(true);
- }
- else
+ } else
LOG.error("MessageBatch will be lost. This should not happen.");
}
-
- return;
}
void flush() {
@@ -339,12 +321,10 @@ class NettyClientAsync extends NettyClient {
@Override
public void handleResponse() {
// do nothing
- return;
}
@Override
public String toString() {
- return ToStringBuilder.reflectionToString(this,
- ToStringStyle.SHORT_PREFIX_STYLE);
+ return ToStringBuilder.reflectionToString(this, ToStringStyle.SHORT_PREFIX_STYLE);
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/message/netty/NettyClientSync.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/message/netty/NettyClientSync.java b/jstorm-core/src/main/java/com/alibaba/jstorm/message/netty/NettyClientSync.java
index c239dd1..2f08957 100644
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/message/netty/NettyClientSync.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/message/netty/NettyClientSync.java
@@ -17,43 +17,35 @@
*/
package com.alibaba.jstorm.message.netty;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
-
-import org.apache.commons.lang.builder.ToStringBuilder;
-import org.apache.commons.lang.builder.ToStringStyle;
-import org.jboss.netty.channel.Channel;
-import org.jboss.netty.channel.ChannelFactory;
-import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
import backtype.storm.Config;
import backtype.storm.messaging.TaskMessage;
import backtype.storm.utils.DisruptorQueue;
import backtype.storm.utils.Utils;
-
+import com.alibaba.jstorm.common.metric.AsmGauge;
import com.alibaba.jstorm.common.metric.QueueGauge;
-import com.alibaba.jstorm.metric.JStormHealthCheck;
-import com.alibaba.jstorm.metric.JStormMetrics;
-import com.alibaba.jstorm.metric.MetricDef;
+import com.alibaba.jstorm.metric.*;
import com.alibaba.jstorm.utils.JStormServerUtils;
import com.alibaba.jstorm.utils.JStormUtils;
+import com.alibaba.jstorm.utils.TimeUtils;
import com.codahale.metrics.Gauge;
import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.WaitStrategy;
import com.lmax.disruptor.dsl.ProducerType;
+import org.apache.commons.lang.builder.ToStringBuilder;
+import org.apache.commons.lang.builder.ToStringStyle;
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelFactory;
+import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicLong;
class NettyClientSync extends NettyClient implements EventHandler {
- private static final Logger LOG = LoggerFactory
- .getLogger(NettyClientSync.class);
+ private static final Logger LOG = LoggerFactory.getLogger(NettyClientSync.class);
private ConcurrentLinkedQueue<MessageBatch> batchQueue;
private DisruptorQueue disruptorQueue;
@@ -63,20 +55,14 @@ class NettyClientSync extends NettyClient implements EventHandler {
private AtomicLong emitTs = new AtomicLong(0);
@SuppressWarnings("rawtypes")
- NettyClientSync(Map storm_conf, ChannelFactory factory,
- ScheduledExecutorService scheduler, String host, int port,
- ReconnectRunnable reconnector) {
+ NettyClientSync(Map storm_conf, ChannelFactory factory, ScheduledExecutorService scheduler, String host, int port, ReconnectRunnable reconnector) {
super(storm_conf, factory, scheduler, host, port, reconnector);
batchQueue = new ConcurrentLinkedQueue<MessageBatch>();
- WaitStrategy waitStrategy =
- (WaitStrategy) JStormUtils
- .createDisruptorWaitStrategy(storm_conf);
+ WaitStrategy waitStrategy = (WaitStrategy) Utils.newInstance((String) storm_conf.get(Config.TOPOLOGY_DISRUPTOR_WAIT_STRATEGY));
- disruptorQueue =
- DisruptorQueue.mkInstance(name, ProducerType.MULTI,
- MAX_SEND_PENDING * 8, waitStrategy);
+ disruptorQueue = DisruptorQueue.mkInstance(name, ProducerType.MULTI, MAX_SEND_PENDING * 8, waitStrategy);
disruptorQueue.consumerStarted();
if (connectMyself == false) {
@@ -93,21 +79,14 @@ class NettyClientSync extends NettyClient implements EventHandler {
scheduler.scheduleAtFixedRate(trigger, 10, 1, TimeUnit.SECONDS);
/**
- * In sync mode, it can't directly use common factory, it will occur
- * problem when client close and restart
+ * In sync mode, it can't directly use common factory, it will occur problem when client close and restart
*/
- ThreadFactory bossFactory =
- new NettyRenameThreadFactory(MetricDef.NETTY_CLI
- + JStormServerUtils.getName(host, port) + "-boss");
+ ThreadFactory bossFactory = new NettyRenameThreadFactory(MetricDef.NETTY_CLI + JStormServerUtils.getName(host, port) + "-boss");
bossExecutor = Executors.newCachedThreadPool(bossFactory);
- ThreadFactory workerFactory =
- new NettyRenameThreadFactory(MetricDef.NETTY_CLI
- + JStormServerUtils.getName(host, port) + "-worker");
+ ThreadFactory workerFactory = new NettyRenameThreadFactory(MetricDef.NETTY_CLI + JStormServerUtils.getName(host, port) + "-worker");
workerExecutor = Executors.newCachedThreadPool(workerFactory);
- clientChannelFactory =
- new NioClientSocketChannelFactory(bossExecutor, workerExecutor,
- 1);
+ clientChannelFactory = new NioClientSocketChannelFactory(bossExecutor, workerExecutor, 1);
start();
@@ -115,24 +94,24 @@ class NettyClientSync extends NettyClient implements EventHandler {
}
public void registerSyncMetrics() {
- JStormMetrics.registerWorkerGauge(new Gauge<Double>() {
- @Override
- public Double getValue() {
- return Double.valueOf(batchQueue.size());
- }
- }, MetricDef.NETTY_CLI_SYNC_BATCH_QUEUE, nettyConnection.toString());
-
- QueueGauge cacheQueueGauge =
- new QueueGauge(MetricDef.NETTY_CLI_SYNC_DISR_QUEUE
- + nettyConnection.toString(), disruptorQueue);
-
- JStormMetrics
- .registerWorkerGauge(cacheQueueGauge,
- MetricDef.NETTY_CLI_SYNC_DISR_QUEUE,
- nettyConnection.toString());
- JStormHealthCheck.registerWorkerHealthCheck(
- MetricDef.NETTY_CLI_SYNC_DISR_QUEUE + ":"
- + nettyConnection.toString(), cacheQueueGauge);
+ if (enableNettyMetrics) {
+ JStormMetrics.registerNettyMetric(MetricUtils
+ .nettyMetricName(MetricDef.NETTY_CLI_SYNC_BATCH_QUEUE + nettyConnection.toString(), MetricType.GAUGE),
+ new AsmGauge(new Gauge<Double>() {
+ @Override
+ public Double getValue() {
+ return (double) batchQueue.size();
+ }
+ }));
+
+ QueueGauge cacheQueueGauge = new QueueGauge(disruptorQueue, MetricDef.NETTY_CLI_SYNC_DISR_QUEUE, nettyConnection.toString());
+
+ JStormMetrics.registerNettyMetric(MetricUtils
+ .nettyMetricName(MetricDef.NETTY_CLI_SYNC_DISR_QUEUE + nettyConnection.toString(), MetricType.GAUGE),
+ new AsmGauge(cacheQueueGauge));
+ JStormHealthCheck.registerWorkerHealthCheck(
+ MetricDef.NETTY_CLI_SYNC_DISR_QUEUE + ":" + nettyConnection.toString(), cacheQueueGauge);
+ }
}
/**
@@ -166,8 +145,6 @@ class NettyClientSync extends NettyClient implements EventHandler {
/**
* Don't take care of competition
- *
- * @param blocked
*/
public void sendData() {
long start = System.nanoTime();
@@ -188,12 +165,13 @@ class NettyClientSync extends NettyClient implements EventHandler {
JStormUtils.halt_process(-1, err);
} finally {
long end = System.nanoTime();
- sendTimer.update((end - start) / 1000000.0d);
+ if (sendTimer != null) {
+ sendTimer.update((end - start) / TimeUtils.NS_PER_US);
+ }
}
}
public void sendAllData() {
-
long start = System.nanoTime();
try {
disruptorQueue.consumeBatch(this);
@@ -216,7 +194,9 @@ class NettyClientSync extends NettyClient implements EventHandler {
JStormUtils.halt_process(-1, err);
} finally {
long end = System.nanoTime();
- sendTimer.update((end - start) / 1000000.0d);
+ if (sendTimer != null) {
+ sendTimer.update((end - start) / TimeUtils.NS_PER_US);
+ }
}
}
@@ -227,8 +207,7 @@ class NettyClientSync extends NettyClient implements EventHandler {
}
@Override
- public void onEvent(Object event, long sequence, boolean endOfBatch)
- throws Exception {
+ public void onEvent(Object event, long sequence, boolean endOfBatch) throws Exception {
if (event == null) {
return;
}
@@ -296,22 +275,19 @@ class NettyClientSync extends NettyClient implements EventHandler {
}
public void unregisterSyncMetrics() {
- JStormMetrics.unregisterWorkerMetric(
- MetricDef.NETTY_CLI_SYNC_BATCH_QUEUE,
- nettyConnection.toString());
- JStormMetrics
- .unregisterWorkerMetric(MetricDef.NETTY_CLI_SYNC_DISR_QUEUE,
- nettyConnection.toString());
- JStormHealthCheck
- .unregisterWorkerHealthCheck(MetricDef.NETTY_CLI_SYNC_DISR_QUEUE
- + ":" + nettyConnection.toString());
+ if (enableNettyMetrics) {
+ JStormMetrics.unregisterNettyMetric(MetricUtils
+ .nettyMetricName(MetricDef.NETTY_CLI_SYNC_BATCH_QUEUE + nettyConnection.toString(), MetricType.GAUGE));
+ JStormMetrics.unregisterNettyMetric(MetricUtils
+ .nettyMetricName(MetricDef.NETTY_CLI_SYNC_DISR_QUEUE + nettyConnection.toString(), MetricType.GAUGE));
+ JStormHealthCheck
+ .unregisterWorkerHealthCheck(MetricDef.NETTY_CLI_SYNC_DISR_QUEUE + ":" + nettyConnection.toString());
+ }
}
@Override
public void close() {
- LOG.info(
- "Begin to close connection to {} and flush all data, batchQueue {}, disruptor {}",
- name, batchQueue.size(), disruptorQueue.population());
+ LOG.info("Begin to close connection to {} and flush all data, batchQueue {}, disruptor {}", name, batchQueue.size(), disruptorQueue.population());
sendAllData();
disruptorQueue.haltWithInterrupt();
if (connectMyself == false) {
@@ -326,7 +302,6 @@ class NettyClientSync extends NettyClient implements EventHandler {
@Override
public String toString() {
- return ToStringBuilder.reflectionToString(this,
- ToStringStyle.SHORT_PREFIX_STYLE);
+ return ToStringBuilder.reflectionToString(this, ToStringStyle.SHORT_PREFIX_STYLE);
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/message/netty/NettyConnection.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/message/netty/NettyConnection.java b/jstorm-core/src/main/java/com/alibaba/jstorm/message/netty/NettyConnection.java
index cd8c0fa..4f2358f 100755
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/message/netty/NettyConnection.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/message/netty/NettyConnection.java
@@ -21,23 +21,23 @@ import java.io.Serializable;
import com.alibaba.jstorm.utils.NetWorkUtils;
-public class NettyConnection implements Serializable{
+public class NettyConnection implements Serializable {
protected String clientPort;
protected String serverPort;
-
+
public String getClientPort() {
return clientPort;
}
-
+
public void setClientPort(String client, int port) {
String ip = NetWorkUtils.host2Ip(client);
clientPort = ip + ":" + port;
}
-
+
public String getServerPort() {
return serverPort;
}
-
+
public void setServerPort(String server, int port) {
String ip = NetWorkUtils.host2Ip(server);
serverPort = ip + ":" + port;
@@ -47,12 +47,8 @@ public class NettyConnection implements Serializable{
public int hashCode() {
final int prime = 31;
int result = 1;
- result =
- prime * result
- + ((clientPort == null) ? 0 : clientPort.hashCode());
- result =
- prime * result
- + ((serverPort == null) ? 0 : serverPort.hashCode());
+ result = prime * result + ((clientPort == null) ? 0 : clientPort.hashCode());
+ result = prime * result + ((serverPort == null) ? 0 : serverPort.hashCode());
return result;
}
@@ -77,15 +73,14 @@ public class NettyConnection implements Serializable{
return false;
return true;
}
-
+
@Override
public String toString() {
- return clientPort + "->" + serverPort;
+ return clientPort + "->" + serverPort;
}
-
- public static String mkString(String client, int clientPort,
- String server, int serverPort) {
+
+ public static String mkString(String client, int clientPort, String server, int serverPort) {
return client + ":" + clientPort + "->" + server + ":" + serverPort;
}
-
+
}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/message/netty/NettyContext.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/message/netty/NettyContext.java b/jstorm-core/src/main/java/com/alibaba/jstorm/message/netty/NettyContext.java
index a6ddd9a..1a090f2 100644
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/message/netty/NettyContext.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/message/netty/NettyContext.java
@@ -40,8 +40,7 @@ import com.alibaba.jstorm.metric.MetricDef;
import com.alibaba.jstorm.utils.JStormUtils;
public class NettyContext implements IContext {
- private final static Logger LOG = LoggerFactory
- .getLogger(NettyContext.class);
+ private final static Logger LOG = LoggerFactory.getLogger(NettyContext.class);
@SuppressWarnings("rawtypes")
private Map storm_conf;
@@ -65,36 +64,19 @@ public class NettyContext implements IContext {
public void prepare(Map storm_conf) {
this.storm_conf = storm_conf;
- int maxWorkers =
- Utils.getInt(storm_conf
- .get(Config.STORM_MESSAGING_NETTY_CLIENT_WORKER_THREADS));
- ThreadFactory bossFactory =
- new NettyRenameThreadFactory(MetricDef.NETTY_CLI + "boss");
- ThreadFactory workerFactory =
- new NettyRenameThreadFactory(MetricDef.NETTY_CLI + "worker");
+ int maxWorkers = Utils.getInt(storm_conf.get(Config.STORM_MESSAGING_NETTY_CLIENT_WORKER_THREADS));
+ ThreadFactory bossFactory = new NettyRenameThreadFactory(MetricDef.NETTY_CLI + "boss");
+ ThreadFactory workerFactory = new NettyRenameThreadFactory(MetricDef.NETTY_CLI + "worker");
if (maxWorkers > 0) {
clientChannelFactory =
- new NioClientSocketChannelFactory(
- Executors.newCachedThreadPool(bossFactory),
- Executors.newCachedThreadPool(workerFactory),
- maxWorkers);
+ new NioClientSocketChannelFactory(Executors.newCachedThreadPool(bossFactory), Executors.newCachedThreadPool(workerFactory), maxWorkers);
} else {
- clientChannelFactory =
- new NioClientSocketChannelFactory(
- Executors.newCachedThreadPool(bossFactory),
- Executors.newCachedThreadPool(workerFactory));
+ clientChannelFactory = new NioClientSocketChannelFactory(Executors.newCachedThreadPool(bossFactory), Executors.newCachedThreadPool(workerFactory));
}
- int otherWorkers =
- Utils.getInt(storm_conf.get(Config.TOPOLOGY_WORKERS), 1) - 1;
- int poolSize =
- Math.min(Math.max(1, otherWorkers),
- MAX_CLIENT_SCHEDULER_THREAD_POOL_SIZE);
- clientScheduleService =
- Executors
- .newScheduledThreadPool(poolSize,
- new NettyRenameThreadFactory(
- "client-schedule-service"));
+ int otherWorkers = Utils.getInt(storm_conf.get(Config.TOPOLOGY_WORKERS), 1) - 1;
+ int poolSize = Math.min(Math.max(1, otherWorkers), MAX_CLIENT_SCHEDULER_THREAD_POOL_SIZE);
+ clientScheduleService = Executors.newScheduledThreadPool(poolSize, new NettyRenameThreadFactory("client-schedule-service"));
reconnector = new ReconnectRunnable();
new AsyncLoopThread(reconnector, true, Thread.MIN_PRIORITY, true);
@@ -119,11 +101,9 @@ public class NettyContext implements IContext {
@Override
public IConnection connect(String topology_id, String host, int port) {
if (isSyncMode == true) {
- return new NettyClientSync(storm_conf, clientChannelFactory,
- clientScheduleService, host, port, reconnector);
+ return new NettyClientSync(storm_conf, clientChannelFactory, clientScheduleService, host, port, reconnector);
} else {
- return new NettyClientAsync(storm_conf, clientChannelFactory,
- clientScheduleService, host, port, reconnector);
+ return new NettyClientAsync(storm_conf, clientChannelFactory, clientScheduleService, host, port, reconnector);
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/message/netty/NettyRenameThreadFactory.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/message/netty/NettyRenameThreadFactory.java b/jstorm-core/src/main/java/com/alibaba/jstorm/message/netty/NettyRenameThreadFactory.java
index 2e060c2..5d38fc5 100755
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/message/netty/NettyRenameThreadFactory.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/message/netty/NettyRenameThreadFactory.java
@@ -27,8 +27,7 @@ public class NettyRenameThreadFactory implements ThreadFactory {
static {
// Rename Netty threads
- ThreadRenamingRunnable
- .setThreadNameDeterminer(ThreadNameDeterminer.CURRENT);
+ ThreadRenamingRunnable.setThreadNameDeterminer(ThreadNameDeterminer.CURRENT);
}
final ThreadGroup group;
@@ -37,15 +36,12 @@ public class NettyRenameThreadFactory implements ThreadFactory {
NettyRenameThreadFactory(String name) {
SecurityManager s = System.getSecurityManager();
- group =
- (s != null) ? s.getThreadGroup() : Thread.currentThread()
- .getThreadGroup();
+ group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup();
this.name = name;
}
public Thread newThread(Runnable r) {
- Thread t =
- new Thread(group, r, name + "-" + index.getAndIncrement(), 0);
+ Thread t = new Thread(group, r, name + "-" + index.getAndIncrement(), 0);
if (t.isDaemon())
t.setDaemon(false);
if (t.getPriority() != Thread.NORM_PRIORITY)
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/message/netty/NettyServer.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/message/netty/NettyServer.java b/jstorm-core/src/main/java/com/alibaba/jstorm/message/netty/NettyServer.java
index d00b24f..a5fa859 100644
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/message/netty/NettyServer.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/message/netty/NettyServer.java
@@ -44,21 +44,19 @@ import com.alibaba.jstorm.utils.JStormUtils;
import com.alibaba.jstorm.client.ConfigExtension;
class NettyServer implements IConnection {
- private static final Logger LOG = LoggerFactory
- .getLogger(NettyServer.class);
+ private static final Logger LOG = LoggerFactory.getLogger(NettyServer.class);
@SuppressWarnings("rawtypes")
Map storm_conf;
int port;
// private LinkedBlockingQueue message_queue;
- volatile ChannelGroup allChannels =
- new DefaultChannelGroup("jstorm-server");
+ volatile ChannelGroup allChannels = new DefaultChannelGroup("jstorm-server");
final ChannelFactory factory;
final ServerBootstrap bootstrap;
// ayncBatch is only one solution, so directly set it as true
private final boolean isSyncMode;
-
+
private ConcurrentHashMap<Integer, DisruptorQueue> deserializeQueues;
@SuppressWarnings("rawtypes")
@@ -69,30 +67,17 @@ class NettyServer implements IConnection {
this.deserializeQueues = deserializeQueues;
// Configure the server.
- int buffer_size =
- Utils.getInt(storm_conf
- .get(Config.STORM_MESSAGING_NETTY_BUFFER_SIZE));
- int maxWorkers =
- Utils.getInt(storm_conf
- .get(Config.STORM_MESSAGING_NETTY_SERVER_WORKER_THREADS));
+ int buffer_size = Utils.getInt(storm_conf.get(Config.STORM_MESSAGING_NETTY_BUFFER_SIZE));
+ int maxWorkers = Utils.getInt(storm_conf.get(Config.STORM_MESSAGING_NETTY_SERVER_WORKER_THREADS));
// asyncBatch = ConfigExtension.isNettyTransferAsyncBatch(storm_conf);
- ThreadFactory bossFactory =
- new NettyRenameThreadFactory("server" + "-boss");
- ThreadFactory workerFactory =
- new NettyRenameThreadFactory("server" + "-worker");
+ ThreadFactory bossFactory = new NettyRenameThreadFactory("server" + "-boss");
+ ThreadFactory workerFactory = new NettyRenameThreadFactory("server" + "-worker");
if (maxWorkers > 0) {
- factory =
- new NioServerSocketChannelFactory(
- Executors.newCachedThreadPool(bossFactory),
- Executors.newCachedThreadPool(workerFactory),
- maxWorkers);
+ factory = new NioServerSocketChannelFactory(Executors.newCachedThreadPool(bossFactory), Executors.newCachedThreadPool(workerFactory), maxWorkers);
} else {
- factory =
- new NioServerSocketChannelFactory(
- Executors.newCachedThreadPool(bossFactory),
- Executors.newCachedThreadPool(workerFactory));
+ factory = new NioServerSocketChannelFactory(Executors.newCachedThreadPool(bossFactory), Executors.newCachedThreadPool(workerFactory));
}
bootstrap = new ServerBootstrap(factory);
@@ -108,8 +93,7 @@ class NettyServer implements IConnection {
Channel channel = bootstrap.bind(new InetSocketAddress(port));
allChannels.add(channel);
- LOG.info("Successfull bind {}, buffer_size:{}, maxWorkers:{}", port,
- buffer_size, maxWorkers);
+ LOG.info("Successfull bind {}, buffer_size:{}, maxWorkers:{}", port, buffer_size, maxWorkers);
}
@Override
@@ -129,8 +113,7 @@ class NettyServer implements IConnection {
DisruptorQueue queue = deserializeQueues.get(task);
if (queue == null) {
- LOG.debug("Received invalid message directed at port " + task
- + ". Dropping...");
+ LOG.debug("Received invalid message directed at port " + task + ". Dropping...");
return;
}
@@ -138,8 +121,7 @@ class NettyServer implements IConnection {
}
/**
- * fetch a message from message queue synchronously (flags != 1) or
- * asynchronously (flags==1)
+ * fetch a message from message queue synchronously (flags != 1) or asynchronously (flags==1)
*/
public Object recv(Integer taskId, int flags) {
try {
@@ -211,14 +193,12 @@ class NettyServer implements IConnection {
@Override
public void send(List<TaskMessage> messages) {
- throw new UnsupportedOperationException(
- "Server connection should not send any messages");
+ throw new UnsupportedOperationException("Server connection should not send any messages");
}
@Override
public void send(TaskMessage message) {
- throw new UnsupportedOperationException(
- "Server connection should not send any messages");
+ throw new UnsupportedOperationException("Server connection should not send any messages");
}
@Override
@@ -231,4 +211,8 @@ class NettyServer implements IConnection {
return isSyncMode;
}
+ @Override
+ public boolean available() {
+ return true;
+ }
}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/message/netty/ReconnectRunnable.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/message/netty/ReconnectRunnable.java b/jstorm-core/src/main/java/com/alibaba/jstorm/message/netty/ReconnectRunnable.java
index dcf2a5d..f5ec324 100755
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/message/netty/ReconnectRunnable.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/message/netty/ReconnectRunnable.java
@@ -26,11 +26,9 @@ import org.slf4j.LoggerFactory;
import com.alibaba.jstorm.callback.RunnableCallback;
public class ReconnectRunnable extends RunnableCallback {
- private static final Logger LOG = LoggerFactory
- .getLogger(ReconnectRunnable.class);
+ private static final Logger LOG = LoggerFactory.getLogger(ReconnectRunnable.class);
- private BlockingQueue<NettyClient> queue =
- new LinkedBlockingDeque<NettyClient>();
+ private BlockingQueue<NettyClient> queue = new LinkedBlockingDeque<NettyClient>();
public void pushEvent(NettyClient client) {
queue.offer(client);
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/message/netty/StormClientHandler.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/message/netty/StormClientHandler.java b/jstorm-core/src/main/java/com/alibaba/jstorm/message/netty/StormClientHandler.java
index f84c2f0..7b511e9 100755
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/message/netty/StormClientHandler.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/message/netty/StormClientHandler.java
@@ -30,8 +30,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class StormClientHandler extends SimpleChannelUpstreamHandler {
- private static final Logger LOG = LoggerFactory
- .getLogger(StormClientHandler.class);
+ private static final Logger LOG = LoggerFactory.getLogger(StormClientHandler.class);
private NettyClient client;
private AtomicBoolean being_closed;
@@ -41,16 +40,25 @@ public class StormClientHandler extends SimpleChannelUpstreamHandler {
}
/**
- * Sometime when connect one bad channel which isn't writable, it will call
- * this function
+ * @@@ Comment this function
+ *
+ * Don't allow call from low netty layer, whose call will try to obtain the lock of jstorm netty layer
+ * otherwise it will lead to deadlock
+ */
+// @Override
+// public void channelInterestChanged(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
+//
+// client.notifyInterestChanged(e.getChannel());
+// }
+
+ /**
+ * Sometime when connect one bad channel which isn't writable, it will call this function
*/
@Override
- public void channelConnected(ChannelHandlerContext ctx,
- ChannelStateEvent event) {
+ public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent event) {
// register the newly established channel
Channel channel = event.getChannel();
- LOG.info("connection established to :{}, local port:{}",
- client.getRemoteAddr(), channel.getLocalAddress());
+ LOG.info("connection established to :{}, local port:{}", client.getRemoteAddr(), channel.getLocalAddress());
client.handleResponse();
}
@@ -63,8 +71,8 @@ public class StormClientHandler extends SimpleChannelUpstreamHandler {
/**
*
- * @see org.jboss.netty.channel.SimpleChannelUpstreamHandler#exceptionCaught(org.jboss.netty.channel.ChannelHandlerContext,
- * org.jboss.netty.channel.ExceptionEvent)
+ * @see SimpleChannelUpstreamHandler#exceptionCaught(ChannelHandlerContext,
+ * ExceptionEvent)
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent event) {
@@ -82,14 +90,12 @@ public class StormClientHandler extends SimpleChannelUpstreamHandler {
/**
* Attention please,
*
- * @see org.jboss.netty.channel.SimpleChannelUpstreamHandler#channelDisconnected(org.jboss.netty.channel.ChannelHandlerContext,
- * org.jboss.netty.channel.ChannelStateEvent)
+ * @see SimpleChannelUpstreamHandler#channelDisconnected(ChannelHandlerContext,
+ * ChannelStateEvent)
*/
@Override
- public void channelDisconnected(ChannelHandlerContext ctx,
- ChannelStateEvent e) throws Exception {
- LOG.info("Receive channelDisconnected to {}, channel = {}",
- client.getRemoteAddr(), e.getChannel());
+ public void channelDisconnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
+ LOG.info("Receive channelDisconnected to {}, channel = {}", client.getRemoteAddr(), e.getChannel());
// ctx.sendUpstream(e);
super.channelDisconnected(ctx, e);
@@ -97,10 +103,8 @@ public class StormClientHandler extends SimpleChannelUpstreamHandler {
}
@Override
- public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e)
- throws Exception {
- LOG.info("Connection to {} has been closed, channel = {}",
- client.getRemoteAddr(), e.getChannel());
+ public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
+ LOG.info("Connection to {} has been closed, channel = {}", client.getRemoteAddr(), e.getChannel());
super.channelClosed(ctx, e);
}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/message/netty/StormClientPipelineFactory.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/message/netty/StormClientPipelineFactory.java b/jstorm-core/src/main/java/com/alibaba/jstorm/message/netty/StormClientPipelineFactory.java
index 080f91c..8927809 100755
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/message/netty/StormClientPipelineFactory.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/message/netty/StormClientPipelineFactory.java
@@ -27,12 +27,12 @@ import com.alibaba.jstorm.client.ConfigExtension;
class StormClientPipelineFactory implements ChannelPipelineFactory {
private NettyClient client;
- private Map conf;
+ private Map conf;
StormClientPipelineFactory(NettyClient client, Map conf) {
this.client = client;
this.conf = conf;
-
+
}
public ChannelPipeline getPipeline() throws Exception {
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/message/netty/StormServerHandler.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/message/netty/StormServerHandler.java b/jstorm-core/src/main/java/com/alibaba/jstorm/message/netty/StormServerHandler.java
index 916ce93..c1b9cf2 100755
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/message/netty/StormServerHandler.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/message/netty/StormServerHandler.java
@@ -33,8 +33,7 @@ import org.slf4j.LoggerFactory;
import backtype.storm.messaging.TaskMessage;
class StormServerHandler extends SimpleChannelUpstreamHandler {
- private static final Logger LOG = LoggerFactory
- .getLogger(StormServerHandler.class);
+ private static final Logger LOG = LoggerFactory.getLogger(StormServerHandler.class);
private NettyServer server;
private Map<Channel, Integer> failureCounters;
@@ -71,29 +70,28 @@ class StormServerHandler extends SimpleChannelUpstreamHandler {
LOG.info("Connection established {}", e.getChannel().getRemoteAddress());
server.addChannel(e.getChannel());
}
-
+
@Override
- public void childChannelClosed(
- ChannelHandlerContext ctx, ChildChannelStateEvent e) throws Exception {
+ public void childChannelClosed(ChannelHandlerContext ctx, ChildChannelStateEvent e) throws Exception {
super.childChannelClosed(ctx, e);
LOG.info("Connection closed {}", e.getChildChannel().getRemoteAddress());
-
+
MessageDecoder.removeTransmitHistogram(e.getChildChannel());
}
-
+
@Override
public void channelDisconnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
super.channelDisconnected(ctx, e);
LOG.info("Connection channelDisconnected {}", e.getChannel().getRemoteAddress());
-
+
MessageDecoder.removeTransmitHistogram(e.getChannel());
};
-
+
@Override
public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
super.channelClosed(ctx, e);
LOG.info("Connection channelClosed {}", e.getChannel().getRemoteAddress());
-
+
MessageDecoder.removeTransmitHistogram(e.getChannel());
};
@@ -131,8 +129,7 @@ class StormServerHandler extends SimpleChannelUpstreamHandler {
public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) {
// removeFailureCounter(e.getChannel());
if (e.getChannel() != null) {
- LOG.info("Channel occur exception {}", e.getChannel()
- .getRemoteAddress());
+ LOG.info("Channel occur exception {}", e.getChannel().getRemoteAddress());
}
server.closeChannel(e.getChannel());
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/message/netty/StormServerPipelineFactory.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/message/netty/StormServerPipelineFactory.java b/jstorm-core/src/main/java/com/alibaba/jstorm/message/netty/StormServerPipelineFactory.java
index 9dead91..6489d4f 100755
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/message/netty/StormServerPipelineFactory.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/message/netty/StormServerPipelineFactory.java
@@ -26,7 +26,7 @@ import org.jboss.netty.channel.Channels;
class StormServerPipelineFactory implements ChannelPipelineFactory {
private NettyServer server;
private Map conf;
-
+
StormServerPipelineFactory(NettyServer server, Map conf) {
this.server = server;
this.conf = conf;
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/metric/AlimonitorClient.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/metric/AlimonitorClient.java b/jstorm-core/src/main/java/com/alibaba/jstorm/metric/AlimonitorClient.java
deleted file mode 100755
index 760e538..0000000
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/metric/AlimonitorClient.java
+++ /dev/null
@@ -1,267 +0,0 @@
-package com.alibaba.jstorm.metric;
-
-import java.net.URLEncoder;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.http.HttpEntity;
-import org.apache.http.NameValuePair;
-import org.apache.http.client.entity.UrlEncodedFormEntity;
-import org.apache.http.client.methods.CloseableHttpResponse;
-import org.apache.http.client.methods.HttpGet;
-import org.apache.http.client.methods.HttpPost;
-import org.apache.http.impl.client.CloseableHttpClient;
-import org.apache.http.impl.client.HttpClientBuilder;
-import org.apache.http.message.BasicNameValuePair;
-import org.apache.http.util.EntityUtils;
-import org.apache.log4j.Logger;
-
-import backtype.storm.utils.Utils;
-
-public class AlimonitorClient extends MetricSendClient {
-
- public static Logger LOG = Logger.getLogger(AlimonitorClient.class);
-
- // Send to localhost:15776 by default
- public static final String DEFAUT_ADDR = "127.0.0.1";
- public static final String DEFAULT_PORT = "15776";
- public static final int DEFAUTL_FLAG = 0;
- public static final String DEFAULT_ERROR_INFO = "";
-
- private final String COLLECTION_FLAG = "collection_flag";
- private final String ERROR_INFO = "error_info";
- private final String MSG = "MSG";
-
- private String port;
- private String requestIP;
- private String monitorName;
- private int collectionFlag;
- private String errorInfo;
-
- private boolean post;
-
- public AlimonitorClient() {
- }
-
- public AlimonitorClient(String requestIP, String port, boolean post) {
- this.requestIP = requestIP;
- this.port = port;
- this.post = post;
- this.monitorName = null;
- this.collectionFlag = 0;
- this.errorInfo = null;
- }
-
- public void setIpAddr(String ipAddr) {
- this.requestIP = ipAddr;
- }
-
- public void setPort(String port) {
- this.port = port;
- }
-
- public void setMonitorName(String monitorName) {
- this.monitorName = monitorName;
- }
-
- public void setCollectionFlag(int flag) {
- this.collectionFlag = flag;
- }
-
- public void setErrorInfo(String msg) {
- this.errorInfo = msg;
- }
-
- public void setPostFlag(boolean post) {
- this.post = post;
- }
-
- public String buildURL() {
- return "http://" + requestIP + ":" + port + "/passive";
- }
-
- public String buildRqstAddr() {
- return "http://" + requestIP + ":" + port + "/passive?name="
- + monitorName + "&msg=";
- }
-
- @Override
- public boolean send(Map<String, Object> msg) {
- try {
- if (monitorName == null) {
- LOG.warn("monitor name is null");
- return false;
- }
- return sendRequest(collectionFlag, errorInfo, msg);
- } catch (Exception e) {
- LOG.error("Failed to sendRequest", e);
- return false;
- }
- }
-
- @Override
- public boolean send(List<Map<String, Object>> msg) {
- try {
- if (monitorName == null) {
- LOG.warn("monitor name is null");
- return false;
- }
- return sendRequest(collectionFlag, errorInfo, msg);
- } catch (Exception e) {
- LOG.error("Failed to sendRequest", e);
- return false;
- }
- }
-
- public Map buildAliMonitorMsg(int collection_flag, String error_message) {
- // Json format of the message sent to Alimonitor
- // {
- // "collection_flag":int,
- // "error_info":string,
- // "MSG": ojbect | array
- // }
- Map ret = new HashMap();
- ret.put(COLLECTION_FLAG, collection_flag);
- ret.put(ERROR_INFO, error_message);
- ret.put(MSG, null);
-
- return ret;
- }
-
- private void addMsgData(Map jsonObj, Map<String, Object> map) {
- jsonObj.put(MSG, map);
- }
-
- private void addMsgData(Map jsonObj, List<Map<String, Object>> mapList) {
- // JSONArray jsonArray = new JSONArray();
- // for(Map<String, Object> map : mapList) {
- // jsonArray.add(map);
- // }
-
- jsonObj.put(MSG, mapList);
- }
-
- private boolean sendRequest(int collection_flag, String error_message,
- Map<String, Object> msg) throws Exception {
- boolean ret = false;
-
- if (msg.size() == 0)
- return ret;
-
- Map jsonObj = buildAliMonitorMsg(collection_flag, error_message);
- addMsgData(jsonObj, msg);
- String jsonMsg = jsonObj.toString();
- LOG.info(jsonMsg);
-
- if (post == true) {
- String url = buildURL();
- ret = httpPost(url, jsonMsg);
- } else {
- String request = buildRqstAddr();
- StringBuilder postAddr = new StringBuilder();
- postAddr.append(request);
- postAddr.append(URLEncoder.encode(jsonMsg));
-
- ret = httpGet(postAddr);
- }
-
- return ret;
- }
-
- private boolean sendRequest(int collection_flag, String error_message,
- List<Map<String, Object>> msgList) throws Exception {
- boolean ret = false;
-
- if (msgList.size() == 0)
- return ret;
-
- Map jsonObj = buildAliMonitorMsg(collection_flag, error_message);
- addMsgData(jsonObj, msgList);
-
- String jsonMsg = Utils.to_json(jsonObj);
- LOG.info(jsonMsg);
-
- if (post == true) {
- String url = buildURL();
- ret = httpPost(url, jsonMsg);
- } else {
- String request = buildRqstAddr();
- StringBuilder postAddr = new StringBuilder();
- postAddr.append(request);
- postAddr.append(URLEncoder.encode(jsonMsg));
-
- ret = httpGet(postAddr);
- }
-
- return ret;
- }
-
- private boolean httpGet(StringBuilder postAddr) {
- boolean ret = false;
-
- CloseableHttpClient httpClient = HttpClientBuilder.create().build();
- CloseableHttpResponse response = null;
-
- try {
- HttpGet request = new HttpGet(postAddr.toString());
- response = httpClient.execute(request);
- HttpEntity entity = response.getEntity();
- if (entity != null) {
- LOG.info(EntityUtils.toString(entity));
- }
- EntityUtils.consume(entity);
- ret = true;
- } catch (Exception e) {
- LOG.error("Exception when sending http request to alimonitor", e);
- } finally {
- try {
- if (response != null)
- response.close();
- httpClient.close();
- } catch (Exception e) {
- LOG.error("Exception when closing httpclient", e);
- }
- }
-
- return ret;
- }
-
- private boolean httpPost(String url, String msg) {
- boolean ret = false;
-
- CloseableHttpClient httpClient = HttpClientBuilder.create().build();
- CloseableHttpResponse response = null;
-
- try {
- HttpPost request = new HttpPost(url);
- List<NameValuePair> nvps = new ArrayList<NameValuePair>();
- nvps.add(new BasicNameValuePair("name", monitorName));
- nvps.add(new BasicNameValuePair("msg", msg));
- request.setEntity(new UrlEncodedFormEntity(nvps));
- response = httpClient.execute(request);
- HttpEntity entity = response.getEntity();
- if (entity != null) {
- LOG.info(EntityUtils.toString(entity));
- }
- EntityUtils.consume(entity);
- ret = true;
- } catch (Exception e) {
- LOG.error("Exception when sending http request to alimonitor", e);
- } finally {
- try {
- if (response != null)
- response.close();
- httpClient.close();
- } catch (Exception e) {
- LOG.error("Exception when closing httpclient", e);
- }
- }
-
- return ret;
- }
-
- public void close() {
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/metric/AsmMetricFilter.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/metric/AsmMetricFilter.java b/jstorm-core/src/main/java/com/alibaba/jstorm/metric/AsmMetricFilter.java
new file mode 100644
index 0000000..4313b6f
--- /dev/null
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/metric/AsmMetricFilter.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.jstorm.metric;
+
+import com.alibaba.jstorm.common.metric.AsmMetric;
+
+import java.io.Serializable;
+
+/**
+ * @author Cody (weiyue.wy@alibaba-inc.com)
+ * @since 2.0.5
+ */
+public interface AsmMetricFilter extends Serializable {
+ /**
+ * Matches all metrics, regardless of type or name.
+ */
+ AsmMetricFilter ALL = new AsmMetricFilter() {
+ private static final long serialVersionUID = 7089987006352295530L;
+
+ @Override
+ public boolean matches(String name, AsmMetric metric) {
+ return true;
+ }
+ };
+
+ /**
+ * Returns {@code true} if the metric matches the filter; {@code false} otherwise.
+ *
+ * @param name the metric node
+ * @param metric the metric
+ * @return {@code true} if the metric matches the filter
+ */
+ boolean matches(String name, AsmMetric metric);
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/metric/AsmMetricRegistry.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/metric/AsmMetricRegistry.java b/jstorm-core/src/main/java/com/alibaba/jstorm/metric/AsmMetricRegistry.java
new file mode 100644
index 0000000..710da9d
--- /dev/null
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/metric/AsmMetricRegistry.java
@@ -0,0 +1,205 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.jstorm.metric;
+
+import com.alibaba.jstorm.common.metric.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+/**
+ * metric registry. generally methods of this class should not be exposed, wrapper methods in @see JStormMonitorMetrics should be called.
+ *
+ * @author Cody (weiyue.wy@alibaba-inc.com)
+ * @since 2.0.5
+ */
+public class AsmMetricRegistry implements AsmMetricSet {
+ private static final long serialVersionUID = 8184106900230111064L;
+ private static final Logger LOG = LoggerFactory.getLogger(AsmMetricRegistry.class);
+
+ protected final ConcurrentMap<String, AsmMetric> metrics = new ConcurrentHashMap<String, AsmMetric>();
+
+ public int size() {
+ return metrics.size();
+ }
+
+ /**
+ * Given a {@link com.alibaba.jstorm.common.metric.old.window.Metric}, registers it under the given name.
+ *
+ * @param name the metric node
+ * @param metric the metric
+ * @param <T> the type of the metric
+ * @return {@code metric}
+ * @throws IllegalArgumentException if the name is already registered
+ */
+ @SuppressWarnings("unchecked")
+ public <T extends AsmMetric> AsmMetric register(String name, T metric) throws IllegalArgumentException {
+ metric.setMetricName(name);
+ final AsmMetric existing = metrics.putIfAbsent(name, metric);
+ if (existing == null) {
+ LOG.info("Successfully register metric of {}", name);
+ return metric;
+ } else {
+ LOG.warn("duplicate metric: {}", name);
+ return existing;
+ }
+ }
+
+ /**
+ * Removes the metric with the given name.
+ *
+ * @param name the metric node
+ * @return whether or not the metric was removed
+ */
+ public boolean remove(String name) {
+ final AsmMetric metric = metrics.remove(name);
+ if (metric != null) {
+ LOG.info("Successfully unregister metric of {}", name);
+ return true;
+ }
+ return false;
+ }
+
+ public AsmMetric getMetric(String name) {
+ return metrics.get(name);
+ }
+
+ /**
+ * Returns a set of the names of all the metrics in the registry.
+ *
+ * @return the names of all the metrics
+ */
+ public SortedSet<String> getMetricNames() {
+ return Collections.unmodifiableSortedSet(new TreeSet<String>(metrics.keySet()));
+ }
+
+ /**
+ * Returns a map of all the gauges in the registry and their names.
+ *
+ * @return all the gauges in the registry
+ */
+ public SortedMap<String, AsmGauge> getGauges() {
+ return getGauges(AsmMetricFilter.ALL);
+ }
+
+ /**
+ * Returns a map of all the gauges in the registry and their names which match the given filter.
+ *
+ * @param filter the metric filter to match
+ * @return all the gauges in the registry
+ */
+ public SortedMap<String, AsmGauge> getGauges(AsmMetricFilter filter) {
+ return getMetrics(AsmGauge.class, filter);
+ }
+
+ /**
+ * Returns a map of all the counters in the registry and their names.
+ *
+ * @return all the counters in the registry
+ */
+ public SortedMap<String, AsmCounter> getCounters() {
+ return getCounters(AsmMetricFilter.ALL);
+ }
+
+ /**
+ * Returns a map of all the counters in the registry and their names which match the given filter.
+ *
+ * @param filter the metric filter to match
+ * @return all the counters in the registry
+ */
+ public SortedMap<String, AsmCounter> getCounters(AsmMetricFilter filter) {
+ return getMetrics(AsmCounter.class, filter);
+ }
+
+ /**
+ * Returns a map of all the histograms in the registry and their names.
+ *
+ * @return all the histograms in the registry
+ */
+ public SortedMap<String, AsmHistogram> getHistograms() {
+ return getHistograms(AsmMetricFilter.ALL);
+ }
+
+ /**
+ * Returns a map of all the histograms in the registry and their names which match the given filter.
+ *
+ * @param filter the metric filter to match
+ * @return all the histograms in the registry
+ */
+ public SortedMap<String, AsmHistogram> getHistograms(AsmMetricFilter filter) {
+ return getMetrics(AsmHistogram.class, filter);
+ }
+
+ /**
+ * Returns a map of all the meters in the registry and their names.
+ *
+ * @return all the meters in the registry
+ */
+ public SortedMap<String, AsmMeter> getMeters() {
+ return getMeters(AsmMetricFilter.ALL);
+ }
+
+ /**
+ * Returns a map of all the meters in the registry and their names which match the given filter.
+ *
+ * @param filter the metric filter to match
+ * @return all the meters in the registry
+ */
+ public SortedMap<String, AsmMeter> getMeters(AsmMetricFilter filter) {
+ return getMetrics(AsmMeter.class, filter);
+ }
+
+ /**
+ * Returns a map of all the timers in the registry and their names.
+ *
+ * @return all the timers in the registry
+ */
+ public SortedMap<String, AsmTimer> getTimers() {
+ return getTimers(AsmMetricFilter.ALL);
+ }
+
+ /**
+ * Returns a map of all the timers in the registry and their names which match the given filter.
+ *
+ * @param filter the metric filter to match
+ * @return all the timers in the registry
+ */
+ public SortedMap<String, AsmTimer> getTimers(AsmMetricFilter filter) {
+ return getMetrics(AsmTimer.class, filter);
+ }
+
+ @SuppressWarnings("unchecked")
+ private <T extends AsmMetric> SortedMap<String, T> getMetrics(Class<T> klass, AsmMetricFilter filter) {
+ final TreeMap<String, T> timers = new TreeMap<String, T>();
+ for (Map.Entry<String, AsmMetric> entry : metrics.entrySet()) {
+ if (klass.isInstance(entry.getValue()) && filter.matches(entry.getKey(), entry.getValue())) {
+ timers.put(entry.getKey(), (T) entry.getValue());
+ }
+ }
+ return timers;
+ }
+
+ @Override
+ public Map<String, AsmMetric> getMetrics() {
+ return metrics;
+ }
+
+}