You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by pt...@apache.org on 2015/11/05 21:41:36 UTC
[57/60] [abbrv] storm git commit: remove jstorm-utility directory
http://git-wip-us.apache.org/repos/asf/storm/blob/e8f64d5e/jstorm-utility/ons/pom.xml
----------------------------------------------------------------------
diff --git a/jstorm-utility/ons/pom.xml b/jstorm-utility/ons/pom.xml
deleted file mode 100755
index fc8f06a..0000000
--- a/jstorm-utility/ons/pom.xml
+++ /dev/null
@@ -1,101 +0,0 @@
-<?xml version="1.0"?>
-<project xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd" xmlns="http://maven.apache.org/POM/4.0.0"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
- <modelVersion>4.0.0</modelVersion>
- <parent>
- <groupId>com.taobao</groupId>
- <artifactId>parent</artifactId>
- <version>1.0.2</version>
- </parent>
-
- <!--
- <parent>
- <groupId>com.alibaba.aloha</groupId>
- <artifactId>aloha-utility</artifactId>
- <version>0.2.0-SNAPSHOT</version>
- </parent> -->
- <groupId>com.alibaba.aloha</groupId>
- <artifactId>ons</artifactId>
- <version>0.2.0</version>
-
- <build>
- <plugins>
- <plugin>
- <artifactId>maven-assembly-plugin</artifactId>
- <configuration>
- <descriptorRefs>
- <descriptorRef>jar-with-dependencies</descriptorRef>
- </descriptorRefs>
- </configuration>
- <executions>
- <execution>
- <id>make-assembly</id>
- <phase>package</phase>
- <goals>
- <goal>single</goal>
- </goals>
- </execution>
- </executions>
- </plugin>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-compiler-plugin</artifactId>
- <configuration>
- <source>1.6</source>
- <target>1.6</target>
- </configuration>
- </plugin>
- </plugins>
- </build>
-
- <properties>
- <jstorm.version>0.9.7</jstorm.version>
- </properties>
-
- <dependencies>
-
- <dependency>
- <groupId>com.alibaba.jstorm</groupId>
- <artifactId>jstorm-client</artifactId>
- <version>${jstorm.version}</version>
- <scope>provided</scope>
- </dependency>
- <dependency>
- <groupId>com.alibaba.jstorm</groupId>
- <artifactId>jstorm-client-extension</artifactId>
- <version>${jstorm.version}</version>
- <scope>provided</scope>
- </dependency>
- <dependency>
- <groupId>com.alibaba.jstorm</groupId>
- <artifactId>jstorm-server</artifactId>
- <version>${jstorm.version}</version>
- <scope>provided</scope>
- </dependency>
-
- <dependency>
- <groupId>com.aliyun.openservices</groupId>
- <artifactId>ons-client</artifactId>
- <version>1.1.5</version>
- </dependency>
- <!--
- <dependency>
- <groupId>com.alibaba.rocketmq</groupId>
- <artifactId>rocketmq-common</artifactId>
- <version>3.0.1</version>
- </dependency>
- <dependency>
- <groupId>com.alibaba.rocketmq</groupId>
- <artifactId>rocketmq-client</artifactId>
- <version>3.0.1</version>
- </dependency>
- <dependency>
- <groupId>com.alibaba.rocketmq</groupId>
- <artifactId>rocketmq-remoting</artifactId>
- <version>3.0.1</version>
- </dependency>
- -->
-
- </dependencies>
-
-</project>
http://git-wip-us.apache.org/repos/asf/storm/blob/e8f64d5e/jstorm-utility/ons/src/main/java/com/alibaba/jstorm/LoadConfig.java
----------------------------------------------------------------------
diff --git a/jstorm-utility/ons/src/main/java/com/alibaba/jstorm/LoadConfig.java b/jstorm-utility/ons/src/main/java/com/alibaba/jstorm/LoadConfig.java
deleted file mode 100755
index 6f062eb..0000000
--- a/jstorm-utility/ons/src/main/java/com/alibaba/jstorm/LoadConfig.java
+++ /dev/null
@@ -1,67 +0,0 @@
-package com.alibaba.jstorm;
-
-import org.yaml.snakeyaml.Yaml;
-
-import java.io.FileInputStream;
-import java.io.FileNotFoundException;
-import java.io.InputStream;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Properties;
-
-public class LoadConfig {
- public static final String TOPOLOGY_TYPE = "topology.type";
-
- private static Map LoadProperty(String prop) {
- Map ret = null;
- Properties properties = new Properties();
-
- try {
- InputStream stream = new FileInputStream(prop);
- properties.load(stream);
- ret = new HashMap<Object, Object>();
- ret.putAll(properties);
- } catch (FileNotFoundException e) {
- System.out.println("No such file " + prop);
- } catch (Exception e1) {
- e1.printStackTrace();
- }
-
- return ret;
- }
-
- private static Map LoadYaml(String confPath) {
- Map ret = null;
- Yaml yaml = new Yaml();
-
- try {
- InputStream stream = new FileInputStream(confPath);
-
- ret = (Map) yaml.load(stream);
- if (ret == null || ret.isEmpty() == true) {
- throw new RuntimeException("Failed to read config file");
- }
-
- } catch (FileNotFoundException e) {
- System.out.println("No such file " + confPath);
- throw new RuntimeException("No config file");
- } catch (Exception e1) {
- e1.printStackTrace();
- throw new RuntimeException("Failed to read config file");
- }
-
- return ret;
- }
-
- public static Map LoadConf(String arg) {
- Map ret = null;
-
- if (arg.endsWith("yaml")) {
- ret = LoadYaml(arg);
- } else {
- ret = LoadProperty(arg);
- }
-
- return ret;
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/storm/blob/e8f64d5e/jstorm-utility/ons/src/main/java/com/alibaba/jstorm/TestTopology.java
----------------------------------------------------------------------
diff --git a/jstorm-utility/ons/src/main/java/com/alibaba/jstorm/TestTopology.java b/jstorm-utility/ons/src/main/java/com/alibaba/jstorm/TestTopology.java
deleted file mode 100755
index 9ad17de..0000000
--- a/jstorm-utility/ons/src/main/java/com/alibaba/jstorm/TestTopology.java
+++ /dev/null
@@ -1,80 +0,0 @@
-package com.alibaba.jstorm;
-
-import backtype.storm.Config;
-import backtype.storm.LocalCluster;
-import backtype.storm.StormSubmitter;
-import backtype.storm.topology.TopologyBuilder;
-
-import com.alibaba.jstorm.ons.consumer.ConsumerSpout;
-import com.alibaba.jstorm.ons.producer.ProducerBolt;
-import com.alibaba.jstorm.utils.JStormUtils;
-
-import java.util.HashMap;
-import java.util.Map;
-
-public class TestTopology {
-
- private static Map conf = new HashMap<Object, Object>();
-
- public static void main(String[] args) throws Exception {
- if (args.length == 0) {
- System.err.println("Please input configuration file");
- System.exit(-1);
- }
-
- conf = LoadConfig.LoadConf(args[0]);
-
- TopologyBuilder builder = setupBuilder();
-
- submitTopology(builder);
-
- }
-
- private static TopologyBuilder setupBuilder() throws Exception {
- TopologyBuilder builder = new TopologyBuilder();
-
- int writerParallel = JStormUtils.parseInt(conf.get("topology.producer.parallel"), 1);
-
- int spoutParallel = JStormUtils.parseInt(conf.get("topology.consumer.parallel"), 1);
-
- builder.setSpout("OnsConsumer", new ConsumerSpout(), spoutParallel);
-
- builder.setBolt("OnsProducer", new ProducerBolt(), writerParallel).localFirstGrouping("OnsConsumer");
-
- return builder;
- }
-
- private static void submitTopology(TopologyBuilder builder) {
- try {
- if (local_mode(conf)) {
-
- LocalCluster cluster = new LocalCluster();
-
- cluster.submitTopology(String.valueOf(conf.get("topology.name")), conf, builder.createTopology());
-
- Thread.sleep(200000);
-
- cluster.shutdown();
- } else {
- StormSubmitter.submitTopology(String.valueOf(conf.get("topology.name")), conf,
- builder.createTopology());
- }
-
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
-
- public static boolean local_mode(Map conf) {
- String mode = (String) conf.get(Config.STORM_CLUSTER_MODE);
- if (mode != null) {
- if (mode.equals("local")) {
- return true;
- }
- }
-
- return false;
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e8f64d5e/jstorm-utility/ons/src/main/java/com/alibaba/jstorm/ons/OnsConfig.java
----------------------------------------------------------------------
diff --git a/jstorm-utility/ons/src/main/java/com/alibaba/jstorm/ons/OnsConfig.java b/jstorm-utility/ons/src/main/java/com/alibaba/jstorm/ons/OnsConfig.java
deleted file mode 100644
index c8a9b63..0000000
--- a/jstorm-utility/ons/src/main/java/com/alibaba/jstorm/ons/OnsConfig.java
+++ /dev/null
@@ -1,69 +0,0 @@
-package com.alibaba.jstorm.ons;
-
-import java.io.Serializable;
-import java.util.Map;
-
-import org.apache.commons.lang.StringUtils;
-import org.apache.commons.lang.builder.ToStringBuilder;
-import org.apache.commons.lang.builder.ToStringStyle;
-
-import com.aliyun.openservices.ons.api.PropertyKeyConst;
-
-public class OnsConfig implements Serializable{
-
- private static final long serialVersionUID = -3911741873533333336L;
-
- private final String topic;
- private final String subExpress;
- private final String accessKey;
- private final String secretKey;
-
- public OnsConfig(Map conf) {
- topic = (String)conf.get("Topic");
- if (conf.get("SubExpress") != null) {
- subExpress = (String)conf.get("SubExpress");
- }else {
- subExpress = "*";
- }
- accessKey = (String)conf.get(PropertyKeyConst.AccessKey);
- secretKey = (String)conf.get(PropertyKeyConst.SecretKey);
-
- checkValid();
-
- }
-
- public void checkValid() {
- if (StringUtils.isBlank(topic) == true) {
- throw new RuntimeException("Topic hasn't been set");
- }else if (StringUtils.isBlank(subExpress)) {
- throw new RuntimeException("SubExpress hasn't been set");
- }else if (StringUtils.isBlank(accessKey)) {
- throw new RuntimeException(PropertyKeyConst.AccessKey + " hasn't been set");
- }else if (StringUtils.isBlank(secretKey)) {
- throw new RuntimeException(PropertyKeyConst.SecretKey + " hasn't been set");
- }
-
- }
-
- public String getTopic() {
- return topic;
- }
-
- public String getSubExpress() {
- return subExpress;
- }
-
- public String getAccessKey() {
- return accessKey;
- }
-
- public String getSecretKey() {
- return secretKey;
- }
-
- @Override
- public String toString() {
- return ToStringBuilder.reflectionToString(this,
- ToStringStyle.SHORT_PREFIX_STYLE);
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e8f64d5e/jstorm-utility/ons/src/main/java/com/alibaba/jstorm/ons/OnsTuple.java
----------------------------------------------------------------------
diff --git a/jstorm-utility/ons/src/main/java/com/alibaba/jstorm/ons/OnsTuple.java b/jstorm-utility/ons/src/main/java/com/alibaba/jstorm/ons/OnsTuple.java
deleted file mode 100644
index c3a3e5d..0000000
--- a/jstorm-utility/ons/src/main/java/com/alibaba/jstorm/ons/OnsTuple.java
+++ /dev/null
@@ -1,80 +0,0 @@
-package com.alibaba.jstorm.ons;
-
-import com.aliyun.openservices.ons.api.Message;
-import org.apache.commons.lang.builder.ToStringBuilder;
-import org.apache.commons.lang.builder.ToStringStyle;
-
-import java.io.Serializable;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-
-public class OnsTuple implements Serializable {
-
- /** */
- private static final long serialVersionUID = 2277714452693486955L;
-
- protected final Message message;
-
- protected final AtomicInteger failureTimes;
- protected final long createMs;
- protected long emitMs;
-
- protected transient CountDownLatch latch;
- protected transient boolean isSuccess;
-
- public OnsTuple(Message message) {
- this.message = message;
-
- this.failureTimes = new AtomicInteger(0);
- this.createMs = System.currentTimeMillis();
-
- this.latch = new CountDownLatch(1);
- this.isSuccess = false;
- }
-
- public AtomicInteger getFailureTimes() {
- return failureTimes;
- }
-
- public long getCreateMs() {
- return createMs;
- }
-
- public long getEmitMs() {
- return emitMs;
- }
-
- public void updateEmitMs() {
- this.emitMs = System.currentTimeMillis();
- }
-
- public Message getMessage() {
- return message;
- }
-
- public boolean waitFinish() throws InterruptedException {
- return latch.await(4, TimeUnit.HOURS);
- }
-
- public void done() {
- isSuccess = true;
- latch.countDown();
- }
-
- public void fail() {
- isSuccess = false;
- latch.countDown();
- }
-
- public boolean isSuccess() {
- return isSuccess;
- }
-
-
- @Override
- public String toString() {
- return ToStringBuilder.reflectionToString(this,
- ToStringStyle.SHORT_PREFIX_STYLE);
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e8f64d5e/jstorm-utility/ons/src/main/java/com/alibaba/jstorm/ons/consumer/ConsumerConfig.java
----------------------------------------------------------------------
diff --git a/jstorm-utility/ons/src/main/java/com/alibaba/jstorm/ons/consumer/ConsumerConfig.java b/jstorm-utility/ons/src/main/java/com/alibaba/jstorm/ons/consumer/ConsumerConfig.java
deleted file mode 100644
index 00fac77..0000000
--- a/jstorm-utility/ons/src/main/java/com/alibaba/jstorm/ons/consumer/ConsumerConfig.java
+++ /dev/null
@@ -1,65 +0,0 @@
-package com.alibaba.jstorm.ons.consumer;
-
-import java.util.Map;
-
-import org.apache.commons.lang.StringUtils;
-
-import com.alibaba.jstorm.ons.OnsConfig;
-import com.alibaba.jstorm.utils.JStormUtils;
-import com.aliyun.openservices.ons.api.PropertyKeyConst;
-
-public class ConsumerConfig extends OnsConfig{
-
- private static final long serialVersionUID = 4292162795544528064L;
- private final String consumerId;
- private final int consumerThreadNum;
-
-
- private final String nameServer;
-
-
- public ConsumerConfig(Map conf) {
- super(conf);
-
- consumerId = (String)conf.get(PropertyKeyConst.ConsumerId);
- if (StringUtils.isBlank(consumerId)) {
- throw new RuntimeException(PropertyKeyConst.ConsumerId + " hasn't been set");
- }
- consumerThreadNum = JStormUtils.parseInt(
- conf.get(PropertyKeyConst.ConsumeThreadNums), 4);
-
- nameServer = (String)conf.get(PropertyKeyConst.NAMESRV_ADDR);
- if (nameServer != null) {
- String namekey = "rocketmq.namesrv.domain";
-
- String value = System.getProperty(namekey);
- if (value == null) {
-
- System.setProperty(namekey, nameServer);
- } else if (value.equals(nameServer) == false) {
- throw new RuntimeException("Different nameserver address in the same worker " + value + ":"
- + nameServer);
-
- }
- }
-
- }
-
-
- public String getConsumerId() {
- return consumerId;
- }
-
-
- public int getConsumerThreadNum() {
- return consumerThreadNum;
- }
-
-
- public String getNameServer() {
- return nameServer;
- }
-
-
-
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e8f64d5e/jstorm-utility/ons/src/main/java/com/alibaba/jstorm/ons/consumer/ConsumerFactory.java
----------------------------------------------------------------------
diff --git a/jstorm-utility/ons/src/main/java/com/alibaba/jstorm/ons/consumer/ConsumerFactory.java b/jstorm-utility/ons/src/main/java/com/alibaba/jstorm/ons/consumer/ConsumerFactory.java
deleted file mode 100644
index b16a5c6..0000000
--- a/jstorm-utility/ons/src/main/java/com/alibaba/jstorm/ons/consumer/ConsumerFactory.java
+++ /dev/null
@@ -1,49 +0,0 @@
-package com.alibaba.jstorm.ons.consumer;
-
-import com.aliyun.openservices.ons.api.Consumer;
-import com.aliyun.openservices.ons.api.MessageListener;
-import com.aliyun.openservices.ons.api.ONSFactory;
-import com.aliyun.openservices.ons.api.PropertyKeyConst;
-import org.apache.log4j.Logger;
-
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Properties;
-
-public class ConsumerFactory {
-
- private static final Logger LOG = Logger.getLogger(ConsumerFactory.class);
-
- public static Map<String, Consumer> consumers = new HashMap<String, Consumer>();
-
- public static synchronized Consumer mkInstance(ConsumerConfig consumerConfig, MessageListener listener) throws Exception {
-
-
- String consumerId = consumerConfig.getConsumerId();
- Consumer consumer = consumers.get(consumerId);
- if (consumer != null) {
-
- LOG.info("Consumer of " + consumerId + " has been created, don't recreate it ");
-
- // Attention, this place return null to info duplicated consumer
- return null;
- }
-
- Properties properties = new Properties();
- properties.put(PropertyKeyConst.AccessKey, consumerConfig.getAccessKey());
- properties.put(PropertyKeyConst.SecretKey, consumerConfig.getSecretKey());
- properties.put(PropertyKeyConst.ConsumerId, consumerId);
- properties.put(PropertyKeyConst.ConsumeThreadNums, consumerConfig.getConsumerThreadNum());
- consumer = ONSFactory.createConsumer(properties);
-
- consumer.subscribe(consumerConfig.getTopic(), consumerConfig.getSubExpress(), listener);
- consumer.start();
-
- consumers.put(consumerId, consumer);
- LOG.info("Successfully create " + consumerId + " consumer");
-
- return consumer;
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e8f64d5e/jstorm-utility/ons/src/main/java/com/alibaba/jstorm/ons/consumer/ConsumerSpout.java
----------------------------------------------------------------------
diff --git a/jstorm-utility/ons/src/main/java/com/alibaba/jstorm/ons/consumer/ConsumerSpout.java b/jstorm-utility/ons/src/main/java/com/alibaba/jstorm/ons/consumer/ConsumerSpout.java
deleted file mode 100644
index b32186d..0000000
--- a/jstorm-utility/ons/src/main/java/com/alibaba/jstorm/ons/consumer/ConsumerSpout.java
+++ /dev/null
@@ -1,268 +0,0 @@
-package com.alibaba.jstorm.ons.consumer;
-
-import backtype.storm.spout.SpoutOutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.IRichSpout;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.tuple.Fields;
-import backtype.storm.tuple.Values;
-import com.alibaba.jstorm.client.metric.MetricClient;
-import com.alibaba.jstorm.client.spout.IAckValueSpout;
-import com.alibaba.jstorm.client.spout.IFailValueSpout;
-import com.alibaba.jstorm.metric.JStormHistogram;
-import com.alibaba.jstorm.ons.OnsTuple;
-import com.alibaba.jstorm.utils.JStormUtils;
-import com.aliyun.openservices.ons.api.*;
-import org.apache.log4j.Logger;
-
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.LinkedBlockingDeque;
-import java.util.concurrent.atomic.AtomicInteger;
-
-
-public class ConsumerSpout implements IRichSpout, IAckValueSpout, IFailValueSpout, MessageListener {
- /** */
- private static final long serialVersionUID = 8476906628618859716L;
- private static final Logger LOG = Logger.getLogger(ConsumerSpout.class);
-
- public static final String ONS_SPOUT_FLOW_CONTROL = "OnsSpoutFlowControl";
- public static final String ONS_SPOUT_AUTO_ACK = "OnsSpoutAutoAck";
- public static final String ONS_MSG_MAX_FAIL_TIMES = "OnsMsgMaxFailTimes";
-
- protected SpoutOutputCollector collector;
- protected transient Consumer consumer;
- protected transient ConsumerConfig consumerConfig;
-
- protected Map conf;
- protected String id;
- protected boolean flowControl;
- protected boolean autoAck;
- protected long maxFailTimes;
- protected boolean active = true;
-
- protected transient LinkedBlockingDeque<OnsTuple> sendingQueue;
-
- protected transient MetricClient metricClient;
- protected transient JStormHistogram waithHistogram;
- protected transient JStormHistogram processHistogram;
-
-
- public ConsumerSpout() {
-
- }
-
-
- public void initMetricClient(TopologyContext context) {
- metricClient = new MetricClient(context);
- waithHistogram = metricClient.registerHistogram("OnsTupleWait", null);
- processHistogram = metricClient.registerHistogram("OnsTupleProcess", null);
- }
-
-
- @Override
- public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
- this.conf = conf;
- this.collector = collector;
- this.id = context.getThisComponentId() + ":" + context.getThisTaskId();
- this.sendingQueue = new LinkedBlockingDeque<OnsTuple>();
-
- this.flowControl = JStormUtils.parseBoolean(conf.get(ONS_SPOUT_FLOW_CONTROL), true);
- this.autoAck = JStormUtils.parseBoolean(conf.get(ONS_SPOUT_AUTO_ACK), false);
- this.maxFailTimes = JStormUtils.parseLong(conf.get(ONS_MSG_MAX_FAIL_TIMES), 5);
-
- StringBuilder sb = new StringBuilder();
- sb.append("Begin to init MetaSpout:").append(id);
- sb.append(", flowControl:").append(flowControl);
- sb.append(", autoAck:").append(autoAck);
- LOG.info(sb.toString());
-
- initMetricClient(context);
-
- try {
- consumerConfig = new ConsumerConfig(conf);
- consumer = ConsumerFactory.mkInstance(consumerConfig, this);
- }
- catch (Exception e) {
- LOG.error("Failed to create Meta Consumer ", e);
- throw new RuntimeException("Failed to create MetaConsumer" + id, e);
- }
-
- if (consumer == null) {
- LOG.warn(id + " already exist consumer in current worker, don't need to fetch data ");
-
- new Thread(new Runnable() {
-
- @Override
- public void run() {
- while (true) {
- try {
- Thread.sleep(10000);
- }
- catch (InterruptedException e) {
- break;
- }
-
- StringBuilder sb = new StringBuilder();
- sb.append("Only on meta consumer can be run on one process,");
- sb.append(" but there are mutliple spout consumes with the same topic@groupid meta, so the second one ");
- sb.append(id).append(" do nothing ");
- LOG.info(sb.toString());
- }
- }
- }).start();
- }
-
- LOG.info("Successfully init " + id);
- }
-
-
- @Override
- public void close() {
- if (consumer != null && active == true) {
- active = false;
- consumer.shutdown();
-
- }
- }
-
-
- @Override
- public void activate() {
- if (consumer != null && active == false) {
- active = true;
- consumer.start();
- }
-
- }
-
-
- @Override
- public void deactivate() {
- if (consumer != null && active == true) {
- active = false;
- consumer.shutdown();
- }
- }
-
-
- public void sendTuple(OnsTuple OnsTuple) {
- OnsTuple.updateEmitMs();
- collector.emit(new Values(OnsTuple), OnsTuple.getCreateMs());
- }
-
-
- @Override
- public void nextTuple() {
- OnsTuple OnsTuple = null;
- try {
- OnsTuple = sendingQueue.take();
- }
- catch (InterruptedException e) {
- }
-
- if (OnsTuple == null) {
- return;
- }
-
- sendTuple(OnsTuple);
-
- }
-
-
- @Deprecated
- public void ack(Object msgId) {
- LOG.warn("Shouldn't go this function");
- }
-
-
- @Deprecated
- public void fail(Object msgId) {
- LOG.warn("Shouldn't go this function");
- }
-
-
- @Override
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
- declarer.declare(new Fields("OnsTuple"));
- }
-
-
- @Override
- public Map<String, Object> getComponentConfiguration() {
- return null;
- }
-
-
- @Override
- public void fail(Object msgId, List<Object> values) {
- OnsTuple OnsTuple = (OnsTuple) values.get(0);
- AtomicInteger failTimes = OnsTuple.getFailureTimes();
-
- int failNum = failTimes.incrementAndGet();
- if (failNum > maxFailTimes) {
- LOG.warn("Message " + OnsTuple.getMessage().getMsgID() + " fail times " + failNum);
- finishTuple(OnsTuple);
- return;
- }
-
- if (flowControl) {
- sendingQueue.offer(OnsTuple);
- }
- else {
- sendTuple(OnsTuple);
- }
- }
-
-
- public void finishTuple(OnsTuple OnsTuple) {
- waithHistogram.update(OnsTuple.getEmitMs() - OnsTuple.getCreateMs());
- processHistogram.update(System.currentTimeMillis() - OnsTuple.getEmitMs());
- OnsTuple.done();
- }
-
-
- @Override
- public void ack(Object msgId, List<Object> values) {
- OnsTuple OnsTuple = (OnsTuple) values.get(0);
- finishTuple(OnsTuple);
- }
-
-
- public Consumer getConsumer() {
- return consumer;
- }
-
-
- @Override
- public Action consume(Message message, ConsumeContext context) {
- try {
- OnsTuple OnsTuple = new OnsTuple(message);
-
- if (flowControl) {
- sendingQueue.offer(OnsTuple);
- }
- else {
- sendTuple(OnsTuple);
- }
-
- if (autoAck) {
- return Action.CommitMessage;
- }
- else {
- OnsTuple.waitFinish();
- if (OnsTuple.isSuccess() == true) {
- return Action.CommitMessage;
- }
- else {
- return Action.ReconsumeLater;
- }
- }
-
- }
- catch (Exception e) {
- LOG.error("Failed to emit " + id, e);
- return Action.ReconsumeLater;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e8f64d5e/jstorm-utility/ons/src/main/java/com/alibaba/jstorm/ons/producer/ProducerBolt.java
----------------------------------------------------------------------
diff --git a/jstorm-utility/ons/src/main/java/com/alibaba/jstorm/ons/producer/ProducerBolt.java b/jstorm-utility/ons/src/main/java/com/alibaba/jstorm/ons/producer/ProducerBolt.java
deleted file mode 100644
index 2a65e54..0000000
--- a/jstorm-utility/ons/src/main/java/com/alibaba/jstorm/ons/producer/ProducerBolt.java
+++ /dev/null
@@ -1,94 +0,0 @@
-package com.alibaba.jstorm.ons.producer;
-
-import backtype.storm.task.OutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.IRichBolt;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.tuple.Tuple;
-
-import com.alibaba.jstorm.ons.OnsTuple;
-import com.alibaba.jstorm.utils.RunCounter;
-import com.aliyun.openservices.ons.api.Message;
-import com.aliyun.openservices.ons.api.Producer;
-import com.aliyun.openservices.ons.api.SendResult;
-import org.apache.log4j.Logger;
-
-import java.util.Map;
-
-
-public class ProducerBolt implements IRichBolt {
-
- private static final long serialVersionUID = 2495121976857546346L;
-
- private static final Logger LOG = Logger.getLogger(ProducerBolt.class);
-
- protected OutputCollector collector;
- protected ProducerConfig producerConfig;
- protected Producer producer;
- protected RunCounter runCounter;
-
- public void prepare(Map stormConf, TopologyContext context,
- OutputCollector collector) {
- this.collector = collector;
- this.runCounter = new RunCounter(ProducerBolt.class);
- this.producerConfig = new ProducerConfig(stormConf);
- try {
- this.producer = ProducerFactory.mkInstance(producerConfig);
- } catch (Exception e) {
- // TODO Auto-generated catch block
- throw new RuntimeException(e);
- }
-
- }
-
- public void execute(Tuple tuple) {
- // TODO Auto-generated method stub
- OnsTuple msgTuple = (OnsTuple)tuple.getValue(0);
- long before = System.currentTimeMillis();
- SendResult sendResult = null;
- try {
- Message msg = new Message(
- producerConfig.getTopic(),
- producerConfig.getSubExpress(),
- //Message Body
- //�κζ�������ʽ�����ݣ�ONS�����κθ�Ԥ����ҪProducer��ConsumerЭ�̺�һ�µ����л��ͷ����л���ʽ
- msgTuple.getMessage().getBody());
-
- // ���ô�����Ϣ��ҵ��ؼ����ԣ��뾡����ȫ��Ψһ��
- // �Է��������������յ���Ϣ����£���ͨ��ONS Console��ѯ��Ϣ��������
- // ע�⣺������Ҳ����Ӱ����Ϣ�����շ�
- if (msgTuple.getMessage().getKey() != null) {
- msg.setKey(msgTuple.getMessage().getKey());
- }
- //������Ϣ��ֻҪ�����쳣���dzɹ�
- sendResult = producer.send(msg);
-
- LOG.info("Success send msg of " + msgTuple.getMessage().getMsgID());
- runCounter.count(System.currentTimeMillis() - before);
- } catch (Exception e) {
- LOG.error("Failed to send message, SendResult:" + sendResult + "\n", e);
- runCounter.count(System.currentTimeMillis() - before);
- collector.fail(tuple);
- return ;
- //throw new FailedException(e);
- }
-
- collector.ack(tuple);
- }
-
- public void cleanup() {
- // TODO Auto-generated method stub
- ProducerFactory.rmInstance(producerConfig.getProducerId());
- producer = null;
- }
-
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
- // TODO Auto-generated method stub
-
- }
-
- public Map<String, Object> getComponentConfiguration() {
- // TODO Auto-generated method stub
- return null;
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e8f64d5e/jstorm-utility/ons/src/main/java/com/alibaba/jstorm/ons/producer/ProducerConfig.java
----------------------------------------------------------------------
diff --git a/jstorm-utility/ons/src/main/java/com/alibaba/jstorm/ons/producer/ProducerConfig.java b/jstorm-utility/ons/src/main/java/com/alibaba/jstorm/ons/producer/ProducerConfig.java
deleted file mode 100644
index 3ac7fb6..0000000
--- a/jstorm-utility/ons/src/main/java/com/alibaba/jstorm/ons/producer/ProducerConfig.java
+++ /dev/null
@@ -1,29 +0,0 @@
-package com.alibaba.jstorm.ons.producer;
-
-import java.util.Map;
-
-import com.alibaba.jstorm.ons.OnsConfig;
-import com.aliyun.openservices.ons.api.PropertyKeyConst;
-
-public class ProducerConfig extends OnsConfig{
-
- private static final long serialVersionUID = 1532254745626913230L;
-
- private final String producerId ;
-
- public ProducerConfig(Map conf) {
- super(conf);
-
- producerId = (String)conf.get(PropertyKeyConst.ProducerId);
- if (producerId == null) {
- throw new RuntimeException(PropertyKeyConst.ProducerId + " hasn't been set");
- }
-
-
- }
-
- public String getProducerId() {
- return producerId;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e8f64d5e/jstorm-utility/ons/src/main/java/com/alibaba/jstorm/ons/producer/ProducerFactory.java
----------------------------------------------------------------------
diff --git a/jstorm-utility/ons/src/main/java/com/alibaba/jstorm/ons/producer/ProducerFactory.java b/jstorm-utility/ons/src/main/java/com/alibaba/jstorm/ons/producer/ProducerFactory.java
deleted file mode 100644
index 203805d..0000000
--- a/jstorm-utility/ons/src/main/java/com/alibaba/jstorm/ons/producer/ProducerFactory.java
+++ /dev/null
@@ -1,59 +0,0 @@
-package com.alibaba.jstorm.ons.producer;
-
-import com.aliyun.openservices.ons.api.Consumer;
-import com.aliyun.openservices.ons.api.MessageListener;
-import com.aliyun.openservices.ons.api.ONSFactory;
-import com.aliyun.openservices.ons.api.Producer;
-import com.aliyun.openservices.ons.api.PropertyKeyConst;
-import org.apache.log4j.Logger;
-
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Properties;
-
-public class ProducerFactory {
-
- private static final Logger LOG = Logger.getLogger(ProducerFactory.class);
-
- public static Map<String, Producer> producers = new HashMap<String, Producer>();
-
- public static synchronized Producer mkInstance(ProducerConfig producerConfig) throws Exception{
-
- String producerId = producerConfig.getProducerId();
- Producer producer = producers.get(producerId);
- if (producer != null) {
-
- LOG.info("Producer of " + producerId + " has been created, don't recreate it ");
- return producer;
- }
-
- Properties properties = new Properties();
- properties.put(PropertyKeyConst.ProducerId, producerConfig.getProducerId());
- properties.put(PropertyKeyConst.AccessKey, producerConfig.getAccessKey());
- properties.put(PropertyKeyConst.SecretKey, producerConfig.getSecretKey());
-
- producer = ONSFactory.createProducer(properties);
- producer.start();
-
-
- producers.put(producerId, producer);
- LOG.info("Successfully create " + producerId + " producer");
-
- return producer;
-
- }
-
- public static synchronized void rmInstance(String producerId) {
- Producer producer = producers.remove(producerId);
- if (producer == null) {
-
- LOG.info("Producer of " + producerId + " has already been shutdown ");
- return ;
- }
-
- producer.shutdown();
- LOG.info("Producer of " + producerId + " has been shutdown ");
- return ;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e8f64d5e/jstorm-utility/ons/test/main/resources/metaspout.yaml
----------------------------------------------------------------------
diff --git a/jstorm-utility/ons/test/main/resources/metaspout.yaml b/jstorm-utility/ons/test/main/resources/metaspout.yaml
deleted file mode 100755
index f007772..0000000
--- a/jstorm-utility/ons/test/main/resources/metaspout.yaml
+++ /dev/null
@@ -1,32 +0,0 @@
-
-#Meta Client Configuration
-# Please refer MetaClientConfig for every setting's details
-meta.topic: "bbl_user"
-meta.consumer.group: "bbl_user"
-meta.subexpress: "*"
-#meta.nameserver: ""
-#meta.pull.interval.ms: 0
-#meta.max.fail.times: 5
-#meta.internal.queue.size: 256
-#meta.batch.send.msg.size: 16
-#meta.batch.pull.msg.size: 32
-#meta.pull.thread.num: 4
-#meta.spout.auto.ack: false
-#meta.spout.flow.contro: true
-#yyyyMMddHHmmss
-meta.consumer.start.timestamp: "20141011000000"
-#meta.extra.properties:
-
-topology.name: test_meta_spout
-topology.version: 1.0.0
-topology.workers: 5
-topology.max.spout.pending: 10
-topology.acker.executors: 1
-
-topology.debug: false
-topology.debug.recv.tuple: false
-storm.cluster.mode: local
-
-topology.spout.parallel: 2
-topology.writer.parallel: 1
-
http://git-wip-us.apache.org/repos/asf/storm/blob/e8f64d5e/jstorm-utility/rocket-mq
----------------------------------------------------------------------
diff --git a/jstorm-utility/rocket-mq b/jstorm-utility/rocket-mq
deleted file mode 160000
index 372e9d8..0000000
--- a/jstorm-utility/rocket-mq
+++ /dev/null
@@ -1 +0,0 @@
-Subproject commit 372e9d87667272e6da7b5501d6d7dd2bad41ce6f
http://git-wip-us.apache.org/repos/asf/storm/blob/e8f64d5e/jstorm-utility/topology-monitor/.gitignore
----------------------------------------------------------------------
diff --git a/jstorm-utility/topology-monitor/.gitignore b/jstorm-utility/topology-monitor/.gitignore
deleted file mode 100755
index 2bf102a..0000000
--- a/jstorm-utility/topology-monitor/.gitignore
+++ /dev/null
@@ -1,13 +0,0 @@
-# Lines that start with '#' are comments.
-*~
-*.diff
-*#
-.classpath
-.project
-.settings
-bin
-*.class
-.eclipse
-target
-*.iml
-*.versionsBackup
http://git-wip-us.apache.org/repos/asf/storm/blob/e8f64d5e/jstorm-utility/topology-monitor/README.md
----------------------------------------------------------------------
diff --git a/jstorm-utility/topology-monitor/README.md b/jstorm-utility/topology-monitor/README.md
deleted file mode 100755
index ad4f892..0000000
--- a/jstorm-utility/topology-monitor/README.md
+++ /dev/null
@@ -1,2 +0,0 @@
-storm-util
-==========
http://git-wip-us.apache.org/repos/asf/storm/blob/e8f64d5e/jstorm-utility/topology-monitor/pom.xml
----------------------------------------------------------------------
diff --git a/jstorm-utility/topology-monitor/pom.xml b/jstorm-utility/topology-monitor/pom.xml
deleted file mode 100755
index 714b3e3..0000000
--- a/jstorm-utility/topology-monitor/pom.xml
+++ /dev/null
@@ -1,110 +0,0 @@
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
- <modelVersion>4.0.0</modelVersion>
- <groupId>com.dianping.cosmos</groupId>
- <artifactId>storm-util</artifactId>
- <packaging>jar</packaging>
- <version>1.3-SNAPSHOT</version>
- <name>storm-util</name>
- <url>http://maven.apache.org</url>
- <dependencies>
- <dependency>
- <groupId>com.dianping</groupId>
- <artifactId>blackhole-consumer</artifactId>
- <version>2.0.5</version>
- </dependency>
- <dependency>
- <groupId>com.dianping</groupId>
- <artifactId>blackhole-common</artifactId>
- <version>2.0.5</version>
- </dependency>
- <dependency>
- <groupId>redis.clients</groupId>
- <artifactId>jedis</artifactId>
- <version>2.4.2</version>
- </dependency>
- <dependency>
- <groupId>org.apache.storm</groupId>
- <artifactId>storm-core</artifactId>
- <version>0.9.1-incubating</version>
- <exclusions>
- <exclusion>
- <groupId>ch.qos.logback</groupId>
- <artifactId>logback-classic</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.slf4j</groupId>
- <artifactId>log4j-over-slf4j</artifactId>
- </exclusion>
- </exclusions>
- <scope>provided</scope>
- </dependency>
- <dependency>
- <groupId>com.dianping.puma</groupId>
- <artifactId>puma-client</artifactId>
- <version>0.1.1</version>
- </dependency>
- <dependency>
- <groupId>log4j</groupId>
- <artifactId>log4j</artifactId>
- <version>1.2.14</version>
- <scope>provided</scope>
- </dependency>
- <dependency>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-log4j12</artifactId>
- <version>1.5.11</version>
- </dependency>
- <dependency>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-api</artifactId>
- <version>1.5.11</version>
- </dependency>
- <dependency>
- <groupId>junit</groupId>
- <artifactId>junit</artifactId>
- <version>3.8.1</version>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>com.dianping.cat</groupId>
- <artifactId>cat-core</artifactId>
- <version>1.0.5</version>
- </dependency>
- </dependencies>
- <build>
- <plugins>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-source-plugin</artifactId>
- <version>2.2</version>
- <executions>
- <execution>
- <goals>
- <goal>jar-no-fork</goal>
- </goals>
- </execution>
- </executions>
- </plugin>
- </plugins>
- </build>
- <distributionManagement>
- <repository>
- <id>dianping.repo</id>
- <name>Dian Ping internal repository for released artifacts</name>
- <url>http://mvn.dianpingoa.com/dianping-releases</url>
- </repository>
- <snapshotRepository>
- <id>dianping.repo.snapshots</id>
- <name>mvn.dianpingoa.com-snapshots</name>
- <url>http://mvn.dianpingoa.com/dianping-snapshots</url>
- </snapshotRepository>
- </distributionManagement>
-<repositories>
- <repository>
- <id>clojars</id>
- <url>http://clojars.org/repo/</url>
- </repository>
-</repositories>
-
-</project>
http://git-wip-us.apache.org/repos/asf/storm/blob/e8f64d5e/jstorm-utility/topology-monitor/src/main/java/com/dianping/cosmos/BlackholeBlockingQueueSpout.java
----------------------------------------------------------------------
diff --git a/jstorm-utility/topology-monitor/src/main/java/com/dianping/cosmos/BlackholeBlockingQueueSpout.java b/jstorm-utility/topology-monitor/src/main/java/com/dianping/cosmos/BlackholeBlockingQueueSpout.java
deleted file mode 100755
index 50e870a..0000000
--- a/jstorm-utility/topology-monitor/src/main/java/com/dianping/cosmos/BlackholeBlockingQueueSpout.java
+++ /dev/null
@@ -1,114 +0,0 @@
-package com.dianping.cosmos;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import backtype.storm.metric.api.CountMetric;
-import backtype.storm.spout.SpoutOutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.IRichSpout;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.tuple.Fields;
-import backtype.storm.tuple.Values;
-import backtype.storm.utils.Utils;
-
-import com.dianping.cosmos.util.CatMetricUtil;
-import com.dianping.cosmos.util.Constants;
-import com.dianping.lion.client.LionException;
-import com.dp.blackhole.consumer.Consumer;
-import com.dp.blackhole.consumer.ConsumerConfig;
-import com.dp.blackhole.consumer.MessageStream;
-
-@SuppressWarnings({"rawtypes", "unchecked"})
-public class BlackholeBlockingQueueSpout implements IRichSpout {
- private static final long serialVersionUID = 386827585122587595L;
- public static final Logger LOG = LoggerFactory.getLogger(BlackholeBlockingQueueSpout.class);
- private SpoutOutputCollector collector;
- private String topic;
- private String group;
- private MessageStream stream;
- private Consumer consumer;
- private MessageFetcher fetchThread;
- private int warnningStep = 0;
- private transient CountMetric _spoutMetric;
-
- public BlackholeBlockingQueueSpout(String topic, String group) {
- this.topic = topic;
- this.group = group;
- }
-
- @Override
- public void open(Map conf, TopologyContext context,
- SpoutOutputCollector _collector) {
- collector = _collector;
- _spoutMetric = new CountMetric();
- context.registerMetric(CatMetricUtil.getSpoutMetricName(topic, group),
- _spoutMetric, Constants.EMIT_FREQUENCY_IN_SECONDS);
-
- ConsumerConfig config = new ConsumerConfig();
- try {
- consumer = new Consumer(topic, group, config);
- } catch (LionException e) {
- throw new RuntimeException(e);
- }
- consumer.start();
- stream = consumer.getStream();
-
- fetchThread = new MessageFetcher(stream);
- new Thread(fetchThread).start();
- }
-
- @Override
- public void close() {
- fetchThread.shutdown();
- }
-
- @Override
- public void activate() {
-
- }
-
- @Override
- public void deactivate() {
- }
-
- @Override
- public void nextTuple() {
- String message = fetchThread.pollMessage();
- if (message != null) {
- collector.emit(topic, new Values(message));
- _spoutMetric.incr();
- } else {
- Utils.sleep(100);
- warnningStep++;
- if (warnningStep % 100 == 0) {
- LOG.warn("Queue is empty, cannot poll message.");
- }
- }
- }
-
- @Override
- public void ack(Object msgId) {
- LOG.debug("ack: " + msgId);
-
- }
-
- @Override
- public void fail(Object msgId) {
- LOG.info("fail: " + msgId);
- }
-
- @Override
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
- declarer.declareStream(topic, new Fields("event"));
- }
-
- @Override
- public Map getComponentConfiguration(){
- Map<String, Object> conf = new HashMap<String, Object>();
- return conf;
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e8f64d5e/jstorm-utility/topology-monitor/src/main/java/com/dianping/cosmos/BlackholeSpout.java
----------------------------------------------------------------------
diff --git a/jstorm-utility/topology-monitor/src/main/java/com/dianping/cosmos/BlackholeSpout.java b/jstorm-utility/topology-monitor/src/main/java/com/dianping/cosmos/BlackholeSpout.java
deleted file mode 100755
index 86dfffb..0000000
--- a/jstorm-utility/topology-monitor/src/main/java/com/dianping/cosmos/BlackholeSpout.java
+++ /dev/null
@@ -1,101 +0,0 @@
-package com.dianping.cosmos;
-
-import java.util.Map;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import backtype.storm.metric.api.CountMetric;
-import backtype.storm.spout.SpoutOutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.IRichSpout;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.tuple.Fields;
-import backtype.storm.tuple.Values;
-
-import com.dianping.cosmos.util.CatMetricUtil;
-import com.dianping.cosmos.util.Constants;
-import com.dianping.lion.client.LionException;
-import com.dp.blackhole.consumer.Consumer;
-import com.dp.blackhole.consumer.ConsumerConfig;
-import com.dp.blackhole.consumer.MessageStream;
-
-@SuppressWarnings({"rawtypes"})
-public class BlackholeSpout implements IRichSpout {
- private static final long serialVersionUID = 1L;
-
- public static final Logger LOG = LoggerFactory.getLogger(BlackholeSpout.class);
-
- private SpoutOutputCollector collector;
- private String topic;
- private String group;
- private MessageStream stream;
- private Consumer consumer;
- private transient CountMetric _spoutMetric;
-
- public BlackholeSpout(String topic, String group) {
- this.topic = topic;
- this.group = group;
- }
-
- @Override
- public void open(Map conf, TopologyContext context,
- SpoutOutputCollector _collector) {
- collector = _collector;
- _spoutMetric = new CountMetric();
- context.registerMetric(CatMetricUtil.getSpoutMetricName(topic, group),
- _spoutMetric, Constants.EMIT_FREQUENCY_IN_SECONDS);
-
- ConsumerConfig config = new ConsumerConfig();
- try {
- consumer = new Consumer(topic, group, config);
- } catch (LionException e) {
- throw new RuntimeException(e);
- }
- consumer.start();
- stream = consumer.getStream();
- }
-
- @Override
- public void close() {
- }
-
- @Override
- public void activate() {
- }
-
- @Override
- public void deactivate() {
- }
-
- @Override
- public void nextTuple() {
- for (String message : stream) {
- collector.emit(topic, new Values(message));
- _spoutMetric.incr();
- }
- }
-
- @Override
- public void ack(Object msgId) {
- LOG.debug("ack: " + msgId);
-
- }
-
- @Override
- public void fail(Object msgId) {
- LOG.info("fail: " + msgId);
- }
-
- @Override
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
- declarer.declareStream(topic, new Fields("event"));
- }
-
- @Override
- public Map<String, Object> getComponentConfiguration() {
- // TODO Auto-generated method stub
- return null;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e8f64d5e/jstorm-utility/topology-monitor/src/main/java/com/dianping/cosmos/MessageFetcher.java
----------------------------------------------------------------------
diff --git a/jstorm-utility/topology-monitor/src/main/java/com/dianping/cosmos/MessageFetcher.java b/jstorm-utility/topology-monitor/src/main/java/com/dianping/cosmos/MessageFetcher.java
deleted file mode 100755
index b379dd3..0000000
--- a/jstorm-utility/topology-monitor/src/main/java/com/dianping/cosmos/MessageFetcher.java
+++ /dev/null
@@ -1,50 +0,0 @@
-package com.dianping.cosmos;
-
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.dp.blackhole.consumer.MessageStream;
-
-public class MessageFetcher implements Runnable {
- public static final Logger LOG = LoggerFactory.getLogger(MessageFetcher.class);
- private final int MAX_QUEUE_SIZE = 1000;
- private final int TIME_OUT = 5000;
-
- private BlockingQueue<String> emitQueue;
- private MessageStream stream;
-
- private volatile boolean running;
- public MessageFetcher(MessageStream stream) {
- this.running = true;
- this.stream = stream;
- this.emitQueue = new LinkedBlockingQueue<String>(MAX_QUEUE_SIZE);
- }
-
- @Override
- public void run() {
- while (running) {
- for (String message : stream) {
- try {
- while(!emitQueue.offer(message, TIME_OUT, TimeUnit.MILLISECONDS)) {
- LOG.error("Queue is full, cannot offer message.");
- }
- } catch (InterruptedException e) {
- LOG.error("Thread Interrupted");
- running = false;
- }
- }
- }
- }
-
- public String pollMessage() {
- return emitQueue.poll();
- }
-
- public void shutdown() {
- this.running = false;
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e8f64d5e/jstorm-utility/topology-monitor/src/main/java/com/dianping/cosmos/PumaSpout.java
----------------------------------------------------------------------
diff --git a/jstorm-utility/topology-monitor/src/main/java/com/dianping/cosmos/PumaSpout.java b/jstorm-utility/topology-monitor/src/main/java/com/dianping/cosmos/PumaSpout.java
deleted file mode 100755
index fb1ff2a..0000000
--- a/jstorm-utility/topology-monitor/src/main/java/com/dianping/cosmos/PumaSpout.java
+++ /dev/null
@@ -1,194 +0,0 @@
-package com.dianping.cosmos;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.LinkedBlockingQueue;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import backtype.storm.spout.SpoutOutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.IRichSpout;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.tuple.Fields;
-import backtype.storm.tuple.Values;
-
-import com.dianping.puma.api.ConfigurationBuilder;
-import com.dianping.puma.api.EventListener;
-import com.dianping.puma.api.PumaClient;
-import com.dianping.puma.core.event.ChangedEvent;
-import com.dianping.puma.core.event.RowChangedEvent;
-
-
-public class PumaSpout implements IRichSpout{
- public static final Logger LOG = LoggerFactory.getLogger(PumaSpout.class);
-
- private SpoutOutputCollector collector;
- private PumaEventListener listener;
- private BlockingQueue<RowChangedEvent> receiveQueue;
- private Map<String, RowChangedEvent> waitingForAck;
-
- private Map<String, String[]> watchTables;
- private String pumaHost;
- private int pumaPort;
- private String pumaName;
- private String pumaTarget;
- private int pumaServerId;
- private String pumaSeqFileBase;
-
- public PumaSpout(String host, int port, String name, String target, HashMap<String, String[]> tables) {
- this(host, port, name, target, tables, null);
- }
-
- public PumaSpout(String host, int port, String name, String target, HashMap<String, String[]> tables, String seqFileBase) {
- this(host, port, name, target, tables, 9999, seqFileBase);
- }
-
- public PumaSpout(String host, int port, String name, String target, HashMap<String, String[]> tables, int serverId, String seqFileBase) {
- pumaHost = host;
- pumaPort = port;
- pumaName = name;
- pumaTarget = target;
- watchTables = tables;
- pumaServerId = serverId;
- pumaSeqFileBase = seqFileBase;
- }
-
- protected static String getMsgId(RowChangedEvent e) {
- return e.getBinlogServerId() + "." + e.getBinlog() + "." + e.getBinlogPos();
- }
-
- protected static String getStreamId(RowChangedEvent e) {
- return e.getDatabase() + "." + e.getTable();
- }
-
- class PumaEventListener implements EventListener {
-
- @Override
- public void onEvent(ChangedEvent event) throws Exception {
- if (!(event instanceof RowChangedEvent)) {
- LOG.error("received event " + event +" which is not a RowChangedEvent");
- return;
- }
- RowChangedEvent e = (RowChangedEvent)event;
- receiveQueue.add(e);
- }
-
- @Override
- public boolean onException(ChangedEvent event, Exception e) {
- return false;
- }
-
- @Override
- public void onConnectException(Exception e) {
- // TODO Auto-generated method stub
-
- }
-
- @Override
- public void onConnected() {
- LOG.info("pumaspout connected");
- }
-
- @Override
- public void onSkipEvent(ChangedEvent event) {
- // TODO Auto-generated method stub
-
- }
-
- }
-
- @Override
- public void ack(Object msgId) {
- LOG.debug("ack: " + msgId);
- waitingForAck.remove(msgId);
- }
-
- @Override
- public void activate() {
- }
-
- @Override
- public void close() {
- // TODO Auto-generated method stub
-
- }
-
- @Override
- public void deactivate() {
- // TODO Auto-generated method stub
-
- }
-
- @Override
- public void fail(Object msgId) {
- LOG.debug("fail: " + msgId + ", resend event");
- RowChangedEvent event = waitingForAck.get(msgId);
- collector.emit(getStreamId(event), new Values(event), getMsgId(event));
- }
-
- @Override
- public void nextTuple() {
- RowChangedEvent event = null;
- try {
- event = receiveQueue.take();
- } catch (InterruptedException e) {
- return;
- }
-
- String msgId = getMsgId(event);
- collector.emit(getStreamId(event), new Values(event), msgId);
- waitingForAck.put(msgId, event);
- }
-
- @Override
- public void open(Map conf, TopologyContext context, SpoutOutputCollector _collector) {
- collector = _collector;
- receiveQueue = new LinkedBlockingQueue<RowChangedEvent>();
- waitingForAck = new ConcurrentHashMap<String, RowChangedEvent>();
-
- ConfigurationBuilder configBuilder = new ConfigurationBuilder();
- configBuilder.ddl(false);
- configBuilder.dml(true);
- configBuilder.transaction(false);
- if (pumaSeqFileBase != null) {
- configBuilder.seqFileBase(pumaSeqFileBase);
- }
- configBuilder.host(pumaHost);
- configBuilder.port(pumaPort);
- configBuilder.serverId(pumaServerId);
- configBuilder.name(pumaName);
- for (Entry<String, String[]> e : watchTables.entrySet()) {
- String db = e.getKey();
- String[] tabs = e.getValue();
- configBuilder.tables(db, tabs);
- }
- configBuilder.target(pumaTarget);
- PumaClient pc = new PumaClient(configBuilder.build());
-
- listener = new PumaEventListener();
- pc.register(listener);
- pc.start();
- }
-
- @Override
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
- for (Entry<String, String[]> entry : watchTables.entrySet()) {
- String db = entry.getKey();
- for (String table : entry.getValue()) {
- String dbTable = db + "." + table;
- declarer.declareStream(dbTable, new Fields("event"));
- }
- }
- }
-
- @Override
- public Map<String, Object> getComponentConfiguration() {
- // TODO Auto-generated method stub
- return null;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e8f64d5e/jstorm-utility/topology-monitor/src/main/java/com/dianping/cosmos/RedisSinkBolt.java
----------------------------------------------------------------------
diff --git a/jstorm-utility/topology-monitor/src/main/java/com/dianping/cosmos/RedisSinkBolt.java b/jstorm-utility/topology-monitor/src/main/java/com/dianping/cosmos/RedisSinkBolt.java
deleted file mode 100755
index d2e02e1..0000000
--- a/jstorm-utility/topology-monitor/src/main/java/com/dianping/cosmos/RedisSinkBolt.java
+++ /dev/null
@@ -1,167 +0,0 @@
-package com.dianping.cosmos;
-
-import java.util.Map;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
-
-import redis.clients.jedis.Jedis;
-import redis.clients.jedis.JedisPool;
-import redis.clients.jedis.exceptions.JedisConnectionException;
-import backtype.storm.task.OutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.IRichBolt;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.tuple.Tuple;
-
-public class RedisSinkBolt implements IRichBolt {
- private final Log LOG = LogFactory.getLog(RedisSinkBolt.class);
- private OutputCollector collector;
- private JedisPool pool;
- private Updater updater;
-
- private String redisHost;
- private int redisPort;
- private int timeout;
- private int retryLimit;
-
- public RedisSinkBolt(String redisHost, int redisPort) {
- this(redisHost, redisPort, 50, 3);
- }
-
- public RedisSinkBolt(String redisHost, int redisPort, int retryLimit) {
- this(redisHost, redisPort, 50, retryLimit);
- }
-
- public RedisSinkBolt(String redisHost, int redisPort, int timeout, int retryLimit) {
- this.redisHost = redisHost;
- this.redisPort = redisPort;
- this.timeout = timeout;
- this.retryLimit = retryLimit;
- }
-
- public void setUpdater(Updater updater) {
- this.updater = updater;
- }
-
- @Override
- public void prepare(Map conf, TopologyContext context,
- OutputCollector collector) {
- this.collector = collector;
-
- GenericObjectPoolConfig pconf = new GenericObjectPoolConfig();
- pconf.setMaxWaitMillis(2000);
- pconf.setMaxTotal(1000);
- pconf.setTestOnBorrow(false);
- pconf.setTestOnReturn(false);
- pconf.setTestWhileIdle(true);
- pconf.setMinEvictableIdleTimeMillis(120000);
- pconf.setTimeBetweenEvictionRunsMillis(60000);
- pconf.setNumTestsPerEvictionRun(-1);
-
- pool = new JedisPool(pconf, redisHost, redisPort, timeout);
- }
-
- private byte[] retryGet(byte[] key) {
- int retry = 0;
- byte[] ret;
- while (true) {
- Jedis jedis = null;
- try {
- jedis = pool.getResource();
- ret = jedis.get(key);
- return ret;
- } catch (JedisConnectionException e) {
- if (jedis != null) {
- pool.returnBrokenResource(jedis);
- jedis = null;
- }
- if (retry > retryLimit) {
- throw e;
- }
- retry++;
- } finally {
- if (jedis != null) {
- pool.returnResource(jedis);
- }
- }
- }
- }
-
- private String retrySet(byte[] key, byte[] value) {
- int retry = 0;
- String ret;
- while (true) {
- Jedis jedis = null;
- try {
- jedis = pool.getResource();
- ret = jedis.set(key, value);
- return ret;
- } catch (JedisConnectionException e) {
- if (jedis != null) {
- pool.returnBrokenResource(jedis);
- jedis = null;
- }
- if (retry > retryLimit) {
- throw e;
- }
- retry++;
- } finally {
- if (jedis != null) {
- pool.returnResource(jedis);
- }
- }
-
- }
- }
-
- @Override
- public void execute(Tuple input) {
- byte[] key = input.getBinary(0);
- byte[] value = input.getBinary(1);
-
- if (key == null || value == null) {
- collector.ack(input);
- return;
- }
-
- try {
- if (updater != null) {
- byte[] oldValue = retryGet(key);
- byte[] newValue = updater.update(oldValue, value);
- if (newValue == null) {
- collector.ack(input);
- return;
- }
- retrySet(key, newValue);
- collector.ack(input);
- return;
- }
-
- retrySet(key, value);
- collector.ack(input);
- } catch (JedisConnectionException e) {
- LOG.warn("JedisConnectionException catched ", e);
- collector.fail(input);
- }
- }
-
- @Override
- public void cleanup() {
- pool.destroy();
- }
-
- @Override
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
- // TODO Auto-generated method stub
-
- }
-
- @Override
- public Map<String, Object> getComponentConfiguration() {
- // TODO Auto-generated method stub
- return null;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e8f64d5e/jstorm-utility/topology-monitor/src/main/java/com/dianping/cosmos/Updater.java
----------------------------------------------------------------------
diff --git a/jstorm-utility/topology-monitor/src/main/java/com/dianping/cosmos/Updater.java b/jstorm-utility/topology-monitor/src/main/java/com/dianping/cosmos/Updater.java
deleted file mode 100755
index 04eedf2..0000000
--- a/jstorm-utility/topology-monitor/src/main/java/com/dianping/cosmos/Updater.java
+++ /dev/null
@@ -1,9 +0,0 @@
-package com.dianping.cosmos;
-
-import java.io.Serializable;
-
-public interface Updater extends Serializable {
-
- byte[] update(byte[] oldValue, byte[] newValue);
-
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e8f64d5e/jstorm-utility/topology-monitor/src/main/java/com/dianping/cosmos/metric/CatMetricsConsumer.java
----------------------------------------------------------------------
diff --git a/jstorm-utility/topology-monitor/src/main/java/com/dianping/cosmos/metric/CatMetricsConsumer.java b/jstorm-utility/topology-monitor/src/main/java/com/dianping/cosmos/metric/CatMetricsConsumer.java
deleted file mode 100755
index 1e2ff06..0000000
--- a/jstorm-utility/topology-monitor/src/main/java/com/dianping/cosmos/metric/CatMetricsConsumer.java
+++ /dev/null
@@ -1,70 +0,0 @@
-package com.dianping.cosmos.metric;
-
-import java.util.Collection;
-import java.util.Map;
-
-import org.apache.commons.lang.StringUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import backtype.storm.metric.api.IMetricsConsumer;
-import backtype.storm.task.IErrorReporter;
-import backtype.storm.task.TopologyContext;
-
-import com.dianping.cosmos.monitor.HttpCatClient;
-import com.dianping.cosmos.util.CatMetricUtil;
-
-/**
- * Listens for all metrics, dumps them to cat
- *
- * To use, add this to your topology's configuration:
- * conf.registerMetricsConsumer(com.dianping.cosmos.metric.CatSpoutMetricsConsumer.class, 1);
- *
- * Or edit the storm.yaml config file:
- *
- * topology.metrics.consumer.register:
- * - class: "com.dianping.cosmos.metric.CatSpoutMetricsConsumer"
- * parallelism.hint: 1
- *
- */
-@SuppressWarnings("rawtypes")
-public class CatMetricsConsumer implements IMetricsConsumer {
- private static final Logger LOGGER = LoggerFactory.getLogger(CatMetricsConsumer.class);
- private String stormId;
-
- @Override
- public void prepare(Map stormConf, Object registrationArgument,
- TopologyContext context, IErrorReporter errorReporter) {
- stormId = context.getStormId();
- }
-
-
- @Override
- public void handleDataPoints(TaskInfo taskInfo, Collection<DataPoint> dataPoints) {
- for (DataPoint p : dataPoints) {
- try{
- if(CatMetricUtil.isCatMetric(p.name)){
- HttpCatClient.sendMetric(getTopologyName(),
- CatMetricUtil.getCatMetricKey(p.name), "sum", String.valueOf(p.value));
- }
- }
- catch(Exception e){
- LOGGER.warn("send metirc 2 cat error.", e);
- }
- }
- }
-
- private String getTopologyName(){
- return StringUtils.substringBefore(stormId, "-");
- }
-
- @Override
- public void cleanup() {
- }
-
- public static void main(String[] args){
- CatMetricsConsumer c = new CatMetricsConsumer();
- c.stormId = "HippoUV_25-15-1410857734";
- System.out.println(c.getTopologyName());
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e8f64d5e/jstorm-utility/topology-monitor/src/main/java/com/dianping/cosmos/monitor/HttpCatClient.java
----------------------------------------------------------------------
diff --git a/jstorm-utility/topology-monitor/src/main/java/com/dianping/cosmos/monitor/HttpCatClient.java b/jstorm-utility/topology-monitor/src/main/java/com/dianping/cosmos/monitor/HttpCatClient.java
deleted file mode 100755
index 9fe6987..0000000
--- a/jstorm-utility/topology-monitor/src/main/java/com/dianping/cosmos/monitor/HttpCatClient.java
+++ /dev/null
@@ -1,57 +0,0 @@
-package com.dianping.cosmos.monitor;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class HttpCatClient {
- private static final Logger LOGGER = LoggerFactory.getLogger(HttpClientService.class);
-
- private HttpCatClient(){
- }
-
- private static HttpClientService httClientSerivce = new HttpClientService();
-
- private static List<String> CAT_SERVERS = new ArrayList<String>();
- //初始化访问的server的地址
- private static AtomicInteger CURRENT_SERVER_INDEX = new AtomicInteger(0);
-
- static{
- CAT_SERVERS.add("http://cat02.nh:8080/");
- CAT_SERVERS.add("http://cat03.nh:8080/");
- CAT_SERVERS.add("http://cat04.nh:8080/");
- CAT_SERVERS.add("http://cat05.nh:8080/");
- CAT_SERVERS.add("http://cat06.nh:8080/");
- }
-
- public static void sendMetric(String domain, String key, String op, String value){
- String server = getServer();
- try{
- StringBuilder request = new StringBuilder();
- request.append(server);
- request.append("cat/r/monitor?timestamp=");
- request.append(System.currentTimeMillis());
- request.append("&group=Storm&domain=");
- request.append(domain);
- request.append("&key=");
- request.append(key);
- request.append("&op=");
- request.append(op);
- request.append("&" + op +"=");
- request.append(value);
- httClientSerivce.get(request.toString());
- }
- catch(Exception e){
- CURRENT_SERVER_INDEX.getAndIncrement();
- LOGGER.error("send to cat " + server + " error.", e);
- }
- }
-
- private static String getServer(){
- int index = CURRENT_SERVER_INDEX.get() % CAT_SERVERS.size();
- return CAT_SERVERS.get(index);
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e8f64d5e/jstorm-utility/topology-monitor/src/main/java/com/dianping/cosmos/monitor/HttpClientService.java
----------------------------------------------------------------------
diff --git a/jstorm-utility/topology-monitor/src/main/java/com/dianping/cosmos/monitor/HttpClientService.java b/jstorm-utility/topology-monitor/src/main/java/com/dianping/cosmos/monitor/HttpClientService.java
deleted file mode 100755
index 843e5da..0000000
--- a/jstorm-utility/topology-monitor/src/main/java/com/dianping/cosmos/monitor/HttpClientService.java
+++ /dev/null
@@ -1,120 +0,0 @@
-package com.dianping.cosmos.monitor;
-
-import java.io.IOException;
-import java.net.URI;
-import java.net.URL;
-import java.util.List;
-
-import org.apache.http.HttpEntity;
-import org.apache.http.HttpResponse;
-import org.apache.http.NameValuePair;
-import org.apache.http.client.HttpClient;
-import org.apache.http.client.entity.UrlEncodedFormEntity;
-import org.apache.http.client.methods.HttpGet;
-import org.apache.http.client.methods.HttpPost;
-import org.apache.http.client.methods.HttpUriRequest;
-import org.apache.http.impl.client.DefaultHttpClient;
-import org.apache.http.params.CoreConnectionPNames;
-import org.apache.http.util.EntityUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-
-/**
- * 向cat写入metric相关信息
- * @author xinchun.wang
- *
- */
-public class HttpClientService {
-
- private static final Logger LOGGER = LoggerFactory.getLogger(HttpClientService.class);
-
-// private static JSONUtil jsonUtil = JSONUtil.getInstance();
-
- protected String excuteGet(String url, boolean useURI) throws Exception {
- HttpClient httpClient = getHttpClient();
- HttpUriRequest request = getGetRequest(url, useURI);
-
- HttpResponse httpResponse = httpClient.execute(request);
-
- String response = parseResponse(url, httpResponse);
- return response;
- }
-
- protected String excutePost(String url, List<NameValuePair> nvps) throws Exception {
- HttpClient httpClient = getHttpClient();
- HttpPost httpPost = new HttpPost(url);
- httpPost.setEntity(new UrlEncodedFormEntity(nvps));
- HttpResponse httpResponse = httpClient.execute(httpPost);
- String response = parseResponse(url, httpResponse);
- return response;
- }
-
- private String parseResponse(String url, HttpResponse httpResponse)
- throws Exception, IOException {
- int status = httpResponse.getStatusLine().getStatusCode();
- if(status != 200){
- String errorMsg = "Error occurs in calling acl service: " + url + ", with status:" + status;
- throw new Exception(errorMsg);
- }
- HttpEntity entry = httpResponse.getEntity();
- String response = EntityUtils.toString(entry, "UTF-8");
- return response;
- }
-
- private HttpClient getHttpClient() {
- HttpClient httpClient = new DefaultHttpClient();
- httpClient.getParams().setParameter(CoreConnectionPNames.CONNECTION_TIMEOUT, 5000);
- httpClient.getParams().setParameter(CoreConnectionPNames.SO_TIMEOUT, 5000);
- return httpClient;
- }
-
- private HttpUriRequest getGetRequest(String url, boolean useURI) throws Exception {
- HttpUriRequest request;
- if(useURI){
- URL requestURL = new URL(url);
- URI uri = new URI(
- requestURL.getProtocol(),
- null,
- requestURL.getHost(),
- requestURL.getPort(),
- requestURL.getPath(),
- requestURL.getQuery(),
- null);
- request = new HttpGet(uri);
- }
- else{
- request = new HttpGet(url);
- }
- return request;
- }
-
-// protected boolean parseResultMap(String response, String url) throws Exception{
-// Map<?, ?> result = jsonUtil.formatJSON2Map(response);
-// if(result == null){
-// return false;
-// }
-// String code = (String)result.get("statusCode");
-// if("-1".equals(code)){
-// throw new Exception(String.valueOf(result.get("errorMsg")));
-// }
-// return true;
-// }
-
-
- public void get(String url) throws Exception{
- String response = excuteGet(url, false);
- if(response == null){
- LOGGER.error("call uri error, response is null, uri = " + url);
- }
- //parseResultMap(response, url);
- }
-
- public void getByURI(String url) throws Exception{
- String response = excuteGet(url, true);
- if(response == null){
- LOGGER.error("call uri error, response is null, uri = " + url);
- }
- //parseResultMap(response, url);
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e8f64d5e/jstorm-utility/topology-monitor/src/main/java/com/dianping/cosmos/monitor/SpoutCounter.java
----------------------------------------------------------------------
diff --git a/jstorm-utility/topology-monitor/src/main/java/com/dianping/cosmos/monitor/SpoutCounter.java b/jstorm-utility/topology-monitor/src/main/java/com/dianping/cosmos/monitor/SpoutCounter.java
deleted file mode 100755
index 4e06df3..0000000
--- a/jstorm-utility/topology-monitor/src/main/java/com/dianping/cosmos/monitor/SpoutCounter.java
+++ /dev/null
@@ -1,24 +0,0 @@
-package com.dianping.cosmos.monitor;
-
-import java.util.concurrent.atomic.AtomicLong;
-
-public class SpoutCounter {
- private AtomicLong repeatCounter = new AtomicLong(0l);
- private AtomicLong tupleCounter = new AtomicLong(0l);
-
- public void incrRepeatCounter(){
- repeatCounter.incrementAndGet();
- }
-
- public long getRepeatCounter(){
- return repeatCounter.get();
- }
-
- public void incrTupleCounter(long increment){
- tupleCounter.addAndGet(increment);
- }
-
- public long getTupleCounter(){
- return tupleCounter.get();
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e8f64d5e/jstorm-utility/topology-monitor/src/main/java/com/dianping/cosmos/monitor/TopologyMonitor.java
----------------------------------------------------------------------
diff --git a/jstorm-utility/topology-monitor/src/main/java/com/dianping/cosmos/monitor/TopologyMonitor.java b/jstorm-utility/topology-monitor/src/main/java/com/dianping/cosmos/monitor/TopologyMonitor.java
deleted file mode 100755
index 88787ca..0000000
--- a/jstorm-utility/topology-monitor/src/main/java/com/dianping/cosmos/monitor/TopologyMonitor.java
+++ /dev/null
@@ -1,90 +0,0 @@
-package com.dianping.cosmos.monitor;
-
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-
-import org.apache.commons.lang.StringUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import backtype.storm.metric.api.IMetricsConsumer.DataPoint;
-import backtype.storm.metric.api.IMetricsConsumer.TaskInfo;
-
-public class TopologyMonitor {
- private static final Logger LOGGER = LoggerFactory.getLogger(TopologyMonitor.class);
-
- private static Map<Integer, SpoutCounter> spoutCounterMap = new ConcurrentHashMap<Integer, SpoutCounter>();
-
- public void monitorStatus(String stormId, TaskInfo taskInfo, DataPoint p) {
- SpoutCounter counter = spoutCounterMap.get(taskInfo.srcTaskId);
- if(counter == null){
- counter = new SpoutCounter();
- spoutCounterMap.put(taskInfo.srcTaskId, counter);
- }
- counter.incrRepeatCounter();
- String value = String.valueOf(p.value);
- long increment = Long.parseLong(value);
- counter.incrTupleCounter(increment);
- //连续1分钟
- if(counter.getRepeatCounter() >= 12){
- //数据量少于某个记录
- LOGGER.info("last minute tuple = " + counter.getTupleCounter());
- if(counter.getTupleCounter() <= 10000){
- LOGGER.error("spout has problem, restar topology....");
- //restartTopology(stormId);
- }
- spoutCounterMap.clear();
- }
- }
-
- /**
- * stromId: MobileUV_7-212-1409657868
- * @param stormId
- */
- public void restartTopology(String stormId){
- String currentTopology = StringUtils.substringBefore(stormId, "-");
- String topologyPrefix = StringUtils.substringBefore(currentTopology, "_");
- String topologyIndex = StringUtils.substringAfter(currentTopology, "_");
- int newIndex = Integer.parseInt(topologyIndex) + 1;
- String newTopologyName = topologyPrefix + "_" + newIndex;
- LOGGER.info("new topology name = " + newTopologyName);
- execStartCommand(newTopologyName);
- LOGGER.info("execStartCommand finish ..");
- execShutdownCommand(currentTopology);
- LOGGER.info("execShutdownCommand finish ..");
- }
-
- public void execStartCommand(String topologyName){
- Process process;
- try {
- process = Runtime.getRuntime().exec(new String[]{
- "/usr/local/storm/bin/storm",
- "jar",
- "/home/hadoop/topology/meteor-traffic-0.0.1.jar",
- "com.dianping.data.warehouse.traffic.mobile.MobileUVTopology",
- topologyName});
- process.waitFor();
- } catch (Exception e) {
- LOGGER.error("", e);
- }
- }
-
- public void execShutdownCommand(String topologyName){
- Process process;
- try {
- process = Runtime.getRuntime().exec(new String[]{
- "/usr/local/storm/bin/storm",
- "kill",
- topologyName,
- "10"});
- process.waitFor();
- } catch (Exception e) {
- LOGGER.error("", e);
- }
- }
-
- public static void main(String[] args){
- TopologyMonitor monitor = new TopologyMonitor();
- monitor.restartTopology("MobileUV_7-212-1409657868");
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e8f64d5e/jstorm-utility/topology-monitor/src/main/java/com/dianping/cosmos/monitor/topology/ClusterInfoBolt.java
----------------------------------------------------------------------
diff --git a/jstorm-utility/topology-monitor/src/main/java/com/dianping/cosmos/monitor/topology/ClusterInfoBolt.java b/jstorm-utility/topology-monitor/src/main/java/com/dianping/cosmos/monitor/topology/ClusterInfoBolt.java
deleted file mode 100755
index 2dd99bd..0000000
--- a/jstorm-utility/topology-monitor/src/main/java/com/dianping/cosmos/monitor/topology/ClusterInfoBolt.java
+++ /dev/null
@@ -1,170 +0,0 @@
-package com.dianping.cosmos.monitor.topology;
-
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.thrift7.TException;
-import org.apache.thrift7.transport.TTransportException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import backtype.storm.Config;
-import backtype.storm.generated.ClusterSummary;
-import backtype.storm.generated.ExecutorSummary;
-import backtype.storm.generated.Nimbus.Client;
-import backtype.storm.generated.NotAliveException;
-import backtype.storm.generated.SupervisorSummary;
-import backtype.storm.generated.TopologyInfo;
-import backtype.storm.generated.TopologySummary;
-import backtype.storm.task.OutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.topology.base.BaseRichBolt;
-import backtype.storm.tuple.Fields;
-import backtype.storm.tuple.Tuple;
-import backtype.storm.tuple.Values;
-import backtype.storm.utils.NimbusClient;
-import backtype.storm.utils.Utils;
-
-import com.dianping.cosmos.monitor.HttpCatClient;
-import com.dianping.cosmos.util.Constants;
-import com.dianping.cosmos.util.TupleHelpers;
-
-@SuppressWarnings({ "rawtypes", "unchecked"})
-public class ClusterInfoBolt extends BaseRichBolt{
- private static final Logger LOGGER = LoggerFactory.getLogger(ClusterInfoBolt.class);
-
- private static final long serialVersionUID = 1L;
- private transient Client client;
- private transient NimbusClient nimbusClient;
- private OutputCollector collector;
- private Map configMap = null;
-
- @Override
- public void prepare(Map map, TopologyContext topologycontext,
- OutputCollector outputcollector) {
- this.collector = outputcollector;
- this.configMap = map;
- initClient(configMap);
- }
- private void initClient(Map map) {
- nimbusClient = NimbusClient.getConfiguredClient(map);
- client = nimbusClient.getClient();
- }
- @Override
- public void execute(Tuple tuple) {
- if (TupleHelpers.isTickTuple(tuple)) {
- if(nimbusClient == null){
- initClient(configMap);
- }
- getClusterInfo(client);
- collector.emit(new Values(tuple));
- }
- }
-
- private void getClusterInfo(Client client) {
- try {
- ClusterSummary clusterSummary = client.getClusterInfo();
- List<SupervisorSummary> supervisorSummaryList = clusterSummary.get_supervisors();
- int totalWorkers = 0;
- int usedWorkers = 0;
- for(SupervisorSummary summary : supervisorSummaryList){
- totalWorkers += summary.get_num_workers() ;
- usedWorkers += summary.get_num_used_workers();
- }
- int freeWorkers = totalWorkers - usedWorkers;
- LOGGER.info("cluster totalWorkers = " + totalWorkers
- + ", usedWorkers = " + usedWorkers
- + ", freeWorkers = " + freeWorkers);
-
- HttpCatClient.sendMetric("ClusterMonitor", "freeSlots", "avg", String.valueOf(freeWorkers));
- HttpCatClient.sendMetric("ClusterMonitor", "totalSlots", "avg", String.valueOf(totalWorkers));
-
- List<TopologySummary> topologySummaryList = clusterSummary.get_topologies();
- long clusterTPS = 0l;
- for(TopologySummary topology : topologySummaryList){
- long topologyTPS = getTopologyTPS(topology, client);
- clusterTPS += topologyTPS;
- if(topology.get_name().startsWith("ClusterMonitor")){
- continue;
- }
- HttpCatClient.sendMetric(topology.get_name(), topology.get_name() + "-TPS", "avg", String.valueOf(topologyTPS));
- }
- HttpCatClient.sendMetric("ClusterMonitor", "ClusterEmitTPS", "avg", String.valueOf(clusterTPS));
-
- } catch (TException e) {
- initClient(configMap);
- LOGGER.error("get client info error.", e);
- }
- catch(NotAliveException nae){
- LOGGER.warn("topology is dead.", nae);
- }
- }
-
- protected long getTopologyTPS(TopologySummary topology, Client client) throws NotAliveException, TException{
- long topologyTps = 0l;
- String topologyId = topology.get_id();
- if(topologyId.startsWith("ClusterMonitor")){
- return topologyTps;
- }
- TopologyInfo topologyInfo = client.getTopologyInfo(topologyId);
- if(topologyInfo == null){
- return topologyTps;
- }
- List<ExecutorSummary> executorSummaryList = topologyInfo.get_executors();
- for(ExecutorSummary executor : executorSummaryList){
- topologyTps += getComponentTPS(executor);
- }
- LOGGER.info("topology = " + topology.get_name() + ", tps = " + topologyTps);
- return topologyTps;
- }
-
- private long getComponentTPS(ExecutorSummary executor) {
- long componentTps = 0l;
- if(executor == null){
- return componentTps;
- }
- String componentId = executor.get_component_id();
-
- if(Utils.isSystemId(componentId)){
- return componentTps;
- }
- if(executor.get_stats() == null){
- return componentTps;
- }
-
- Map<String, Map<String, Long>> emittedMap = executor.get_stats().get_emitted();
- Map<String, Long> minutesEmitted = emittedMap.get("600");
- if(minutesEmitted == null){
- return componentTps;
- }
- for(Map.Entry<String, Long> emittedEntry : minutesEmitted.entrySet()){
- if(Utils.isSystemId(emittedEntry.getKey())){
- continue;
- }
- if(executor.get_uptime_secs() >= 600){
- componentTps += emittedEntry.getValue() / 600;
- }
- if(executor.get_uptime_secs() >= 10 && executor.get_uptime_secs() < 600){
- componentTps += emittedEntry.getValue() / executor.get_uptime_secs();
- }
- }
- LOGGER.debug("component = " + componentId + ", tps = " + componentTps);
- return componentTps;
- }
-
-
- @Override
- public void declareOutputFields(OutputFieldsDeclarer outputfieldsdeclarer) {
- outputfieldsdeclarer.declare(new Fields("monitor"));
- }
-
- @Override
- public Map getComponentConfiguration(){
- Map<String, Object> conf = new HashMap<String, Object>();
- conf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, Constants.TPS_COUNTER_FREQUENCY_IN_SECONDS);
- return conf;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e8f64d5e/jstorm-utility/topology-monitor/src/main/java/com/dianping/cosmos/monitor/topology/ClusterInfoTopology.java
----------------------------------------------------------------------
diff --git a/jstorm-utility/topology-monitor/src/main/java/com/dianping/cosmos/monitor/topology/ClusterInfoTopology.java b/jstorm-utility/topology-monitor/src/main/java/com/dianping/cosmos/monitor/topology/ClusterInfoTopology.java
deleted file mode 100755
index 2c9e9c5..0000000
--- a/jstorm-utility/topology-monitor/src/main/java/com/dianping/cosmos/monitor/topology/ClusterInfoTopology.java
+++ /dev/null
@@ -1,18 +0,0 @@
-package com.dianping.cosmos.monitor.topology;
-
-import backtype.storm.Config;
-import backtype.storm.StormSubmitter;
-import backtype.storm.topology.TopologyBuilder;
-
-public class ClusterInfoTopology {
- public static void main(String[] args) throws Exception {
- TopologyBuilder builder = new TopologyBuilder();
-
- builder.setBolt("ClusterInfo", new ClusterInfoBolt(), 1);
- Config conf = new Config();
- conf.setNumWorkers(1);
-
- StormSubmitter.submitTopology("ClusterMonitor", conf, builder.createTopology());
-
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e8f64d5e/jstorm-utility/topology-monitor/src/main/java/com/dianping/cosmos/util/CatClient.java
----------------------------------------------------------------------
diff --git a/jstorm-utility/topology-monitor/src/main/java/com/dianping/cosmos/util/CatClient.java b/jstorm-utility/topology-monitor/src/main/java/com/dianping/cosmos/util/CatClient.java
deleted file mode 100755
index 65f4a04..0000000
--- a/jstorm-utility/topology-monitor/src/main/java/com/dianping/cosmos/util/CatClient.java
+++ /dev/null
@@ -1,19 +0,0 @@
-package com.dianping.cosmos.util;
-import com.dianping.cat.Cat;
-
-public class CatClient {
-
- private CatClient(){
- }
-
- private static Cat CAT = Cat.getInstance();
-
- static{
- Cat.initialize("cat02.nh","cat03.nh","cat04.nh","cat05.nh");
- }
-
- public static Cat getInstance(){
- return CAT;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e8f64d5e/jstorm-utility/topology-monitor/src/main/java/com/dianping/cosmos/util/CatMetricUtil.java
----------------------------------------------------------------------
diff --git a/jstorm-utility/topology-monitor/src/main/java/com/dianping/cosmos/util/CatMetricUtil.java b/jstorm-utility/topology-monitor/src/main/java/com/dianping/cosmos/util/CatMetricUtil.java
deleted file mode 100755
index 0c4b183..0000000
--- a/jstorm-utility/topology-monitor/src/main/java/com/dianping/cosmos/util/CatMetricUtil.java
+++ /dev/null
@@ -1,45 +0,0 @@
-package com.dianping.cosmos.util;
-
-import org.apache.commons.lang.StringUtils;
-
-public class CatMetricUtil {
- private static final String CAT_METRIC_NAME_PREFIX = "Cat#";
-
- /**
- * 返回BlackHoleSout的metric名称
- * @param topic
- * @param group
- * @return
- */
- public static String getSpoutMetricName(String topic, String group){
- return CAT_METRIC_NAME_PREFIX.concat(topic).concat("[").concat(group).concat("]");
- }
-
- /**
- * 判断是否cat的metirc
- * @param dataPointName
- * @return
- */
- public static boolean isCatMetric(String dataPointName){
- if(StringUtils.isBlank(dataPointName)){
- return false;
- }
- return StringUtils.startsWith(dataPointName, CAT_METRIC_NAME_PREFIX);
- }
-
-
-
- /**
- * 根据metric的名字,返回写入cat上的key
- * @param spoutMetricName
- * @return
- */
- public static String getCatMetricKey(String spoutMetricName){
- if(StringUtils.isBlank(spoutMetricName)
- || !StringUtils.startsWith(spoutMetricName, CAT_METRIC_NAME_PREFIX)){
- return "default";
- }
- return StringUtils.substringAfter(spoutMetricName, CAT_METRIC_NAME_PREFIX);
-
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e8f64d5e/jstorm-utility/topology-monitor/src/main/java/com/dianping/cosmos/util/Constants.java
----------------------------------------------------------------------
diff --git a/jstorm-utility/topology-monitor/src/main/java/com/dianping/cosmos/util/Constants.java b/jstorm-utility/topology-monitor/src/main/java/com/dianping/cosmos/util/Constants.java
deleted file mode 100755
index c5edf1a..0000000
--- a/jstorm-utility/topology-monitor/src/main/java/com/dianping/cosmos/util/Constants.java
+++ /dev/null
@@ -1,9 +0,0 @@
-package com.dianping.cosmos.util;
-
-public class Constants {
-
- public static final int EMIT_FREQUENCY_IN_SECONDS = 5;
-
- public static final int TPS_COUNTER_FREQUENCY_IN_SECONDS = 30;
-
-}