You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by pt...@apache.org on 2015/11/05 21:41:36 UTC

[57/60] [abbrv] storm git commit: remove jstorm-utility directory

http://git-wip-us.apache.org/repos/asf/storm/blob/e8f64d5e/jstorm-utility/ons/pom.xml
----------------------------------------------------------------------
diff --git a/jstorm-utility/ons/pom.xml b/jstorm-utility/ons/pom.xml
deleted file mode 100755
index fc8f06a..0000000
--- a/jstorm-utility/ons/pom.xml
+++ /dev/null
@@ -1,101 +0,0 @@
-<?xml version="1.0"?>
-<project xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd" xmlns="http://maven.apache.org/POM/4.0.0"
-    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
-  <modelVersion>4.0.0</modelVersion>
-    <parent>
-		<groupId>com.taobao</groupId>
-		<artifactId>parent</artifactId>
-		<version>1.0.2</version>
-  </parent>
-  
-  <!--
-  <parent>
-    <groupId>com.alibaba.aloha</groupId>
-    <artifactId>aloha-utility</artifactId>
-    <version>0.2.0-SNAPSHOT</version>
-  </parent> -->
-  <groupId>com.alibaba.aloha</groupId>
-  <artifactId>ons</artifactId>
-  <version>0.2.0</version>
-
-	<build>
-		<plugins>
-			<plugin>
-				<artifactId>maven-assembly-plugin</artifactId>
-				<configuration>
-					<descriptorRefs>
-						<descriptorRef>jar-with-dependencies</descriptorRef>
-					</descriptorRefs>
-				</configuration>
-				<executions>
-					<execution>
-						<id>make-assembly</id>
-						<phase>package</phase>
-						<goals>
-							<goal>single</goal>
-						</goals>
-					</execution>
-				</executions>
-			</plugin>
-			<plugin>
-				<groupId>org.apache.maven.plugins</groupId>
-				<artifactId>maven-compiler-plugin</artifactId>
-				<configuration>
-					<source>1.6</source>
-					<target>1.6</target>
-				</configuration>
-			</plugin>
-		</plugins>
-	</build>
-
-	<properties>
-		<jstorm.version>0.9.7</jstorm.version>
-	</properties>
-  
-  <dependencies>
-
-		<dependency>
-			<groupId>com.alibaba.jstorm</groupId>
-			<artifactId>jstorm-client</artifactId>
-			<version>${jstorm.version}</version>
-			<scope>provided</scope>
-		</dependency>
-		<dependency>
-			<groupId>com.alibaba.jstorm</groupId>
-			<artifactId>jstorm-client-extension</artifactId>
-			<version>${jstorm.version}</version>
-			<scope>provided</scope>
-		</dependency>
-		<dependency>
-			<groupId>com.alibaba.jstorm</groupId>
-			<artifactId>jstorm-server</artifactId>
-			<version>${jstorm.version}</version>
-			<scope>provided</scope>
-		</dependency>
-
-		<dependency>
-			<groupId>com.aliyun.openservices</groupId>
-			<artifactId>ons-client</artifactId>
-			<version>1.1.5</version>
-		</dependency>
-		<!-- 
-		<dependency>
-			<groupId>com.alibaba.rocketmq</groupId>
-			<artifactId>rocketmq-common</artifactId>
-			<version>3.0.1</version>
-		</dependency>
-		<dependency>
-			<groupId>com.alibaba.rocketmq</groupId>
-			<artifactId>rocketmq-client</artifactId>
-			<version>3.0.1</version>
-		</dependency>
-		<dependency>
-			<groupId>com.alibaba.rocketmq</groupId>
-			<artifactId>rocketmq-remoting</artifactId>
-			<version>3.0.1</version>
-		</dependency>
-		 -->
-
-	</dependencies>
-
-</project>

http://git-wip-us.apache.org/repos/asf/storm/blob/e8f64d5e/jstorm-utility/ons/src/main/java/com/alibaba/jstorm/LoadConfig.java
----------------------------------------------------------------------
diff --git a/jstorm-utility/ons/src/main/java/com/alibaba/jstorm/LoadConfig.java b/jstorm-utility/ons/src/main/java/com/alibaba/jstorm/LoadConfig.java
deleted file mode 100755
index 6f062eb..0000000
--- a/jstorm-utility/ons/src/main/java/com/alibaba/jstorm/LoadConfig.java
+++ /dev/null
@@ -1,67 +0,0 @@
-package com.alibaba.jstorm;
-
-import org.yaml.snakeyaml.Yaml;
-
-import java.io.FileInputStream;
-import java.io.FileNotFoundException;
-import java.io.InputStream;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Properties;
-
-public class LoadConfig {
-	public static final String TOPOLOGY_TYPE = "topology.type";
-	
-	private static Map LoadProperty(String prop) {
-		Map ret = null;
-		Properties properties = new Properties();
-
-		try {
-			InputStream stream = new FileInputStream(prop);
-			properties.load(stream);
-			ret = new HashMap<Object, Object>();
-			ret.putAll(properties);
-		} catch (FileNotFoundException e) {
-			System.out.println("No such file " + prop);
-		} catch (Exception e1) {
-			e1.printStackTrace();
-		}
-		
-		return ret;
-	}
-
-	private static Map LoadYaml(String confPath) {
-        Map ret = null;
-		Yaml yaml = new Yaml();
-
-		try {
-			InputStream stream = new FileInputStream(confPath);
-
-			ret = (Map) yaml.load(stream);
-			if (ret == null || ret.isEmpty() == true) {
-				throw new RuntimeException("Failed to read config file");
-			}
-
-		} catch (FileNotFoundException e) {
-			System.out.println("No such file " + confPath);
-			throw new RuntimeException("No config file");
-		} catch (Exception e1) {
-			e1.printStackTrace();
-			throw new RuntimeException("Failed to read config file");
-		}
-
-		return ret;
-	}
-	
-	public static Map LoadConf(String arg) {
-		Map ret = null;
-		
-		if (arg.endsWith("yaml")) {
-			ret = LoadYaml(arg);
-		} else {
-			ret = LoadProperty(arg);
-		}
-		
-		return ret;
-	}
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/e8f64d5e/jstorm-utility/ons/src/main/java/com/alibaba/jstorm/TestTopology.java
----------------------------------------------------------------------
diff --git a/jstorm-utility/ons/src/main/java/com/alibaba/jstorm/TestTopology.java b/jstorm-utility/ons/src/main/java/com/alibaba/jstorm/TestTopology.java
deleted file mode 100755
index 9ad17de..0000000
--- a/jstorm-utility/ons/src/main/java/com/alibaba/jstorm/TestTopology.java
+++ /dev/null
@@ -1,80 +0,0 @@
-package com.alibaba.jstorm;
-
-import backtype.storm.Config;
-import backtype.storm.LocalCluster;
-import backtype.storm.StormSubmitter;
-import backtype.storm.topology.TopologyBuilder;
-
-import com.alibaba.jstorm.ons.consumer.ConsumerSpout;
-import com.alibaba.jstorm.ons.producer.ProducerBolt;
-import com.alibaba.jstorm.utils.JStormUtils;
-
-import java.util.HashMap;
-import java.util.Map;
-
-public class TestTopology {
-
-	private static Map conf = new HashMap<Object, Object>();
-
-	public static void main(String[] args) throws Exception {
-		if (args.length == 0) {
-			System.err.println("Please input configuration file");
-			System.exit(-1);
-		}
-
-		conf = LoadConfig.LoadConf(args[0]);
-
-		TopologyBuilder builder = setupBuilder();
-
-		submitTopology(builder);
-
-	}
-
-	private static TopologyBuilder setupBuilder() throws Exception {
-		TopologyBuilder builder = new TopologyBuilder();
-
-		int writerParallel = JStormUtils.parseInt(conf.get("topology.producer.parallel"), 1);
-
-		int spoutParallel = JStormUtils.parseInt(conf.get("topology.consumer.parallel"), 1);
-
-		builder.setSpout("OnsConsumer", new ConsumerSpout(), spoutParallel);
-
-		builder.setBolt("OnsProducer", new ProducerBolt(), writerParallel).localFirstGrouping("OnsConsumer");
-
-		return builder;
-	}
-
-	private static void submitTopology(TopologyBuilder builder) {
-		try {
-			if (local_mode(conf)) {
-
-				LocalCluster cluster = new LocalCluster();
-
-				cluster.submitTopology(String.valueOf(conf.get("topology.name")), conf, builder.createTopology());
-
-				Thread.sleep(200000);
-
-				cluster.shutdown();
-			} else {
-				StormSubmitter.submitTopology(String.valueOf(conf.get("topology.name")), conf,
-						builder.createTopology());
-			}
-
-		} catch (Exception e) {
-			e.printStackTrace();
-		}
-	}
-
-	public static boolean local_mode(Map conf) {
-		String mode = (String) conf.get(Config.STORM_CLUSTER_MODE);
-		if (mode != null) {
-			if (mode.equals("local")) {
-				return true;
-			}
-		}
-
-		return false;
-
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e8f64d5e/jstorm-utility/ons/src/main/java/com/alibaba/jstorm/ons/OnsConfig.java
----------------------------------------------------------------------
diff --git a/jstorm-utility/ons/src/main/java/com/alibaba/jstorm/ons/OnsConfig.java b/jstorm-utility/ons/src/main/java/com/alibaba/jstorm/ons/OnsConfig.java
deleted file mode 100644
index c8a9b63..0000000
--- a/jstorm-utility/ons/src/main/java/com/alibaba/jstorm/ons/OnsConfig.java
+++ /dev/null
@@ -1,69 +0,0 @@
-package com.alibaba.jstorm.ons;
-
-import java.io.Serializable;
-import java.util.Map;
-
-import org.apache.commons.lang.StringUtils;
-import org.apache.commons.lang.builder.ToStringBuilder;
-import org.apache.commons.lang.builder.ToStringStyle;
-
-import com.aliyun.openservices.ons.api.PropertyKeyConst;
-
-public class OnsConfig implements Serializable{
-
-	private static final long serialVersionUID = -3911741873533333336L;
-	
-	private final String topic;
-	private final String subExpress;
-	private final String accessKey;
-	private final String secretKey;
-	
-	public OnsConfig(Map conf) {
-		topic = (String)conf.get("Topic");
-		if (conf.get("SubExpress") != null) {
-			subExpress = (String)conf.get("SubExpress");
-		}else {
-			subExpress = "*";
-		}
-		accessKey = (String)conf.get(PropertyKeyConst.AccessKey);
-		secretKey = (String)conf.get(PropertyKeyConst.SecretKey);
-		
-		checkValid();
-		
-	}
-	
-	public void checkValid() {
-		if (StringUtils.isBlank(topic) == true) {
-			throw new RuntimeException("Topic hasn't been set");
-		}else if (StringUtils.isBlank(subExpress)) {
-			throw new RuntimeException("SubExpress hasn't been set");
-		}else if (StringUtils.isBlank(accessKey)) {
-			throw new RuntimeException(PropertyKeyConst.AccessKey + " hasn't been set");			
-		}else if (StringUtils.isBlank(secretKey)) {
-			throw new RuntimeException(PropertyKeyConst.SecretKey + " hasn't been set");
-		}
-		
-	}
-
-	public String getTopic() {
-		return topic;
-	}
-
-	public String getSubExpress() {
-		return subExpress;
-	}
-
-	public String getAccessKey() {
-		return accessKey;
-	}
-
-	public String getSecretKey() {
-		return secretKey;
-	}
-	
-	@Override
-    public String toString() {
-        return ToStringBuilder.reflectionToString(this,
-                ToStringStyle.SHORT_PREFIX_STYLE);
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e8f64d5e/jstorm-utility/ons/src/main/java/com/alibaba/jstorm/ons/OnsTuple.java
----------------------------------------------------------------------
diff --git a/jstorm-utility/ons/src/main/java/com/alibaba/jstorm/ons/OnsTuple.java b/jstorm-utility/ons/src/main/java/com/alibaba/jstorm/ons/OnsTuple.java
deleted file mode 100644
index c3a3e5d..0000000
--- a/jstorm-utility/ons/src/main/java/com/alibaba/jstorm/ons/OnsTuple.java
+++ /dev/null
@@ -1,80 +0,0 @@
-package com.alibaba.jstorm.ons;
-
-import com.aliyun.openservices.ons.api.Message;
-import org.apache.commons.lang.builder.ToStringBuilder;
-import org.apache.commons.lang.builder.ToStringStyle;
-
-import java.io.Serializable;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-
-public class OnsTuple implements Serializable {
-
-	/**  */
-	private static final long serialVersionUID = 2277714452693486955L;
-
-	protected final Message message;
-
-	protected final AtomicInteger failureTimes;
-	protected final long createMs;
-	protected long emitMs;
-
-	protected transient CountDownLatch latch;
-	protected transient boolean isSuccess;
-
-	public OnsTuple(Message message) {
-		this.message = message;
-
-		this.failureTimes = new AtomicInteger(0);
-		this.createMs = System.currentTimeMillis();
-
-		this.latch = new CountDownLatch(1);
-		this.isSuccess = false;
-	}
-
-	public AtomicInteger getFailureTimes() {
-		return failureTimes;
-	}
-	
-	public long getCreateMs() {
-		return createMs;
-	}
-
-	public long getEmitMs() {
-		return emitMs;
-	}
-
-	public void updateEmitMs() {
-		this.emitMs = System.currentTimeMillis();
-	}
-
-	public Message getMessage() {
-		return message;
-	}
-
-	public boolean waitFinish() throws InterruptedException {
-		return latch.await(4, TimeUnit.HOURS);
-	}
-
-	public void done() {
-		isSuccess = true;
-		latch.countDown();
-	}
-
-	public void fail() {
-		isSuccess = false;
-		latch.countDown();
-	}
-
-	public boolean isSuccess() {
-		return isSuccess;
-	}
-	
-
-	@Override
-	public String toString() {
-		return ToStringBuilder.reflectionToString(this,
-				ToStringStyle.SHORT_PREFIX_STYLE);
-	}
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e8f64d5e/jstorm-utility/ons/src/main/java/com/alibaba/jstorm/ons/consumer/ConsumerConfig.java
----------------------------------------------------------------------
diff --git a/jstorm-utility/ons/src/main/java/com/alibaba/jstorm/ons/consumer/ConsumerConfig.java b/jstorm-utility/ons/src/main/java/com/alibaba/jstorm/ons/consumer/ConsumerConfig.java
deleted file mode 100644
index 00fac77..0000000
--- a/jstorm-utility/ons/src/main/java/com/alibaba/jstorm/ons/consumer/ConsumerConfig.java
+++ /dev/null
@@ -1,65 +0,0 @@
-package com.alibaba.jstorm.ons.consumer;
-
-import java.util.Map;
-
-import org.apache.commons.lang.StringUtils;
-
-import com.alibaba.jstorm.ons.OnsConfig;
-import com.alibaba.jstorm.utils.JStormUtils;
-import com.aliyun.openservices.ons.api.PropertyKeyConst;
-
-public class ConsumerConfig extends OnsConfig{
-
-	private static final long serialVersionUID = 4292162795544528064L;
-	private final String consumerId;
-	private final int    consumerThreadNum;
-	
-	
-	private final String nameServer;
-	
-	
-	public ConsumerConfig(Map conf) {
-		super(conf);
-		
-		consumerId = (String)conf.get(PropertyKeyConst.ConsumerId);
-		if (StringUtils.isBlank(consumerId)) {
-			throw new RuntimeException(PropertyKeyConst.ConsumerId  + " hasn't been set");
-		}
-		consumerThreadNum = JStormUtils.parseInt(
-				conf.get(PropertyKeyConst.ConsumeThreadNums), 4);
-		
-		nameServer = (String)conf.get(PropertyKeyConst.NAMESRV_ADDR);
-        if (nameServer != null) {
-            String namekey = "rocketmq.namesrv.domain";
-
-            String value = System.getProperty(namekey);
-            if (value == null) {
-
-                System.setProperty(namekey, nameServer);
-            } else if (value.equals(nameServer) == false) {
-                throw new RuntimeException("Different nameserver address in the same worker " + value + ":"
-                        + nameServer);
-
-            }
-        }
-		
-	}
-
-
-	public String getConsumerId() {
-		return consumerId;
-	}
-
-
-	public int getConsumerThreadNum() {
-		return consumerThreadNum;
-	}
-
-
-	public String getNameServer() {
-		return nameServer;
-	}
-	
-	
-
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e8f64d5e/jstorm-utility/ons/src/main/java/com/alibaba/jstorm/ons/consumer/ConsumerFactory.java
----------------------------------------------------------------------
diff --git a/jstorm-utility/ons/src/main/java/com/alibaba/jstorm/ons/consumer/ConsumerFactory.java b/jstorm-utility/ons/src/main/java/com/alibaba/jstorm/ons/consumer/ConsumerFactory.java
deleted file mode 100644
index b16a5c6..0000000
--- a/jstorm-utility/ons/src/main/java/com/alibaba/jstorm/ons/consumer/ConsumerFactory.java
+++ /dev/null
@@ -1,49 +0,0 @@
-package com.alibaba.jstorm.ons.consumer;
-
-import com.aliyun.openservices.ons.api.Consumer;
-import com.aliyun.openservices.ons.api.MessageListener;
-import com.aliyun.openservices.ons.api.ONSFactory;
-import com.aliyun.openservices.ons.api.PropertyKeyConst;
-import org.apache.log4j.Logger;
-
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Properties;
-
-public class ConsumerFactory {
-
-	private static final Logger LOG = Logger.getLogger(ConsumerFactory.class);
-
-	public static Map<String, Consumer> consumers = new HashMap<String, Consumer>();
-
-	public static synchronized Consumer mkInstance(ConsumerConfig consumerConfig, MessageListener listener) throws Exception {
-		
-
-		String consumerId = consumerConfig.getConsumerId();
-		Consumer consumer = consumers.get(consumerId);
-		if (consumer != null) {
-
-			LOG.info("Consumer of " + consumerId + " has been created, don't recreate it ");
-
-			// Attention, this place return null to info duplicated consumer
-			return null;
-		}
-
-		Properties properties = new Properties();
-		properties.put(PropertyKeyConst.AccessKey, consumerConfig.getAccessKey());
-		properties.put(PropertyKeyConst.SecretKey, consumerConfig.getSecretKey());
-		properties.put(PropertyKeyConst.ConsumerId, consumerId);
-		properties.put(PropertyKeyConst.ConsumeThreadNums, consumerConfig.getConsumerThreadNum());
-		consumer = ONSFactory.createConsumer(properties);
-
-		consumer.subscribe(consumerConfig.getTopic(), consumerConfig.getSubExpress(), listener);
-		consumer.start();
-
-		consumers.put(consumerId, consumer);
-		LOG.info("Successfully create " + consumerId + " consumer");
-
-		return consumer;
-
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e8f64d5e/jstorm-utility/ons/src/main/java/com/alibaba/jstorm/ons/consumer/ConsumerSpout.java
----------------------------------------------------------------------
diff --git a/jstorm-utility/ons/src/main/java/com/alibaba/jstorm/ons/consumer/ConsumerSpout.java b/jstorm-utility/ons/src/main/java/com/alibaba/jstorm/ons/consumer/ConsumerSpout.java
deleted file mode 100644
index b32186d..0000000
--- a/jstorm-utility/ons/src/main/java/com/alibaba/jstorm/ons/consumer/ConsumerSpout.java
+++ /dev/null
@@ -1,268 +0,0 @@
-package com.alibaba.jstorm.ons.consumer;
-
-import backtype.storm.spout.SpoutOutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.IRichSpout;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.tuple.Fields;
-import backtype.storm.tuple.Values;
-import com.alibaba.jstorm.client.metric.MetricClient;
-import com.alibaba.jstorm.client.spout.IAckValueSpout;
-import com.alibaba.jstorm.client.spout.IFailValueSpout;
-import com.alibaba.jstorm.metric.JStormHistogram;
-import com.alibaba.jstorm.ons.OnsTuple;
-import com.alibaba.jstorm.utils.JStormUtils;
-import com.aliyun.openservices.ons.api.*;
-import org.apache.log4j.Logger;
-
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.LinkedBlockingDeque;
-import java.util.concurrent.atomic.AtomicInteger;
-
-
-public class ConsumerSpout implements IRichSpout, IAckValueSpout, IFailValueSpout, MessageListener {
-    /**  */
-    private static final long serialVersionUID = 8476906628618859716L;
-    private static final Logger LOG = Logger.getLogger(ConsumerSpout.class);
-    
-    public static final String ONS_SPOUT_FLOW_CONTROL = "OnsSpoutFlowControl";
-    public static final String ONS_SPOUT_AUTO_ACK = "OnsSpoutAutoAck";
-    public static final String ONS_MSG_MAX_FAIL_TIMES = "OnsMsgMaxFailTimes";
-
-    protected SpoutOutputCollector collector;
-    protected transient Consumer consumer;
-    protected transient ConsumerConfig consumerConfig;
-
-    protected Map conf;
-    protected String id;
-    protected boolean flowControl;
-    protected boolean autoAck;
-    protected long    maxFailTimes;
-    protected boolean active = true;
-
-    protected transient LinkedBlockingDeque<OnsTuple> sendingQueue;
-
-    protected transient MetricClient metricClient;
-    protected transient JStormHistogram waithHistogram;
-    protected transient JStormHistogram processHistogram;
-
-
-    public ConsumerSpout() {
-
-    }
-
-
-    public void initMetricClient(TopologyContext context) {
-        metricClient = new MetricClient(context);
-        waithHistogram = metricClient.registerHistogram("OnsTupleWait", null);
-        processHistogram = metricClient.registerHistogram("OnsTupleProcess", null);
-    }
-
-
-    @Override
-    public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
-        this.conf = conf;
-        this.collector = collector;
-        this.id = context.getThisComponentId() + ":" + context.getThisTaskId();
-        this.sendingQueue = new LinkedBlockingDeque<OnsTuple>();
-
-        this.flowControl = JStormUtils.parseBoolean(conf.get(ONS_SPOUT_FLOW_CONTROL), true);
-        this.autoAck = JStormUtils.parseBoolean(conf.get(ONS_SPOUT_AUTO_ACK), false);
-        this.maxFailTimes = JStormUtils.parseLong(conf.get(ONS_MSG_MAX_FAIL_TIMES), 5);
-
-        StringBuilder sb = new StringBuilder();
-        sb.append("Begin to init MetaSpout:").append(id);
-        sb.append(", flowControl:").append(flowControl);
-        sb.append(", autoAck:").append(autoAck);
-        LOG.info(sb.toString());
-
-        initMetricClient(context);
-
-        try {
-        	consumerConfig = new ConsumerConfig(conf);
-            consumer = ConsumerFactory.mkInstance(consumerConfig, this);
-        }
-        catch (Exception e) {
-            LOG.error("Failed to create Meta Consumer ", e);
-            throw new RuntimeException("Failed to create MetaConsumer" + id, e);
-        }
-
-        if (consumer == null) {
-            LOG.warn(id + " already exist consumer in current worker, don't need to fetch data ");
-
-            new Thread(new Runnable() {
-
-                @Override
-                public void run() {
-                    while (true) {
-                        try {
-                            Thread.sleep(10000);
-                        }
-                        catch (InterruptedException e) {
-                            break;
-                        }
-
-                        StringBuilder sb = new StringBuilder();
-                        sb.append("Only on meta consumer can be run on one process,");
-                        sb.append(" but there are mutliple spout consumes with the same topic@groupid meta, so the second one ");
-                        sb.append(id).append(" do nothing ");
-                        LOG.info(sb.toString());
-                    }
-                }
-            }).start();
-        }
-
-        LOG.info("Successfully init " + id);
-    }
-
-
-    @Override
-    public void close() {
-        if (consumer != null && active == true) {
-        	active = false;
-        	consumer.shutdown();
-            
-        }
-    }
-
-
-    @Override
-    public void activate() {
-        if (consumer != null && active == false) {
-            active = true;
-            consumer.start();
-        }
-
-    }
-
-
-    @Override
-    public void deactivate() {
-        if (consumer != null && active == true) {
-            active = false;
-            consumer.shutdown();
-        }
-    }
-
-
-    public void sendTuple(OnsTuple OnsTuple) {
-        OnsTuple.updateEmitMs();
-        collector.emit(new Values(OnsTuple), OnsTuple.getCreateMs());
-    }
-
-
-    @Override
-    public void nextTuple() {
-        OnsTuple OnsTuple = null;
-        try {
-            OnsTuple = sendingQueue.take();
-        }
-        catch (InterruptedException e) {
-        }
-
-        if (OnsTuple == null) {
-            return;
-        }
-
-        sendTuple(OnsTuple);
-
-    }
-
-
-    @Deprecated
-    public void ack(Object msgId) {
-        LOG.warn("Shouldn't go this function");
-    }
-
-
-    @Deprecated
-    public void fail(Object msgId) {
-        LOG.warn("Shouldn't go this function");
-    }
-
-
-    @Override
-    public void declareOutputFields(OutputFieldsDeclarer declarer) {
-        declarer.declare(new Fields("OnsTuple"));
-    }
-
-
-    @Override
-    public Map<String, Object> getComponentConfiguration() {
-        return null;
-    }
-
-
-    @Override
-    public void fail(Object msgId, List<Object> values) {
-        OnsTuple OnsTuple = (OnsTuple) values.get(0);
-        AtomicInteger failTimes = OnsTuple.getFailureTimes();
-
-        int failNum = failTimes.incrementAndGet();
-        if (failNum > maxFailTimes) {
-            LOG.warn("Message " + OnsTuple.getMessage().getMsgID() + " fail times " + failNum);
-            finishTuple(OnsTuple);
-            return;
-        }
-
-        if (flowControl) {
-            sendingQueue.offer(OnsTuple);
-        }
-        else {
-            sendTuple(OnsTuple);
-        }
-    }
-
-
-    public void finishTuple(OnsTuple OnsTuple) {
-        waithHistogram.update(OnsTuple.getEmitMs() - OnsTuple.getCreateMs());
-        processHistogram.update(System.currentTimeMillis() - OnsTuple.getEmitMs());
-        OnsTuple.done();
-    }
-
-
-    @Override
-    public void ack(Object msgId, List<Object> values) {
-        OnsTuple OnsTuple = (OnsTuple) values.get(0);
-        finishTuple(OnsTuple);
-    }
-
-
-    public Consumer getConsumer() {
-        return consumer;
-    }
-
-
-    @Override
-    public Action consume(Message message, ConsumeContext context) {
-        try {
-            OnsTuple OnsTuple = new OnsTuple(message);
-
-            if (flowControl) {
-                sendingQueue.offer(OnsTuple);
-            }
-            else {
-                sendTuple(OnsTuple);
-            }
-
-            if (autoAck) {
-                return Action.CommitMessage;
-            }
-            else {
-            	OnsTuple.waitFinish();
-                if (OnsTuple.isSuccess() == true) {
-                    return Action.CommitMessage;
-                }
-                else {
-                    return Action.ReconsumeLater;
-                }
-            }
-
-        }
-        catch (Exception e) {
-            LOG.error("Failed to emit " + id, e);
-            return Action.ReconsumeLater;
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e8f64d5e/jstorm-utility/ons/src/main/java/com/alibaba/jstorm/ons/producer/ProducerBolt.java
----------------------------------------------------------------------
diff --git a/jstorm-utility/ons/src/main/java/com/alibaba/jstorm/ons/producer/ProducerBolt.java b/jstorm-utility/ons/src/main/java/com/alibaba/jstorm/ons/producer/ProducerBolt.java
deleted file mode 100644
index 2a65e54..0000000
--- a/jstorm-utility/ons/src/main/java/com/alibaba/jstorm/ons/producer/ProducerBolt.java
+++ /dev/null
@@ -1,94 +0,0 @@
-package com.alibaba.jstorm.ons.producer;
-
-import backtype.storm.task.OutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.IRichBolt;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.tuple.Tuple;
-
-import com.alibaba.jstorm.ons.OnsTuple;
-import com.alibaba.jstorm.utils.RunCounter;
-import com.aliyun.openservices.ons.api.Message;
-import com.aliyun.openservices.ons.api.Producer;
-import com.aliyun.openservices.ons.api.SendResult;
-import org.apache.log4j.Logger;
-
-import java.util.Map;
-
-
-public class ProducerBolt implements IRichBolt {
-
-    private static final long serialVersionUID = 2495121976857546346L;
-    
-    private static final Logger LOG              = Logger.getLogger(ProducerBolt.class);
-
-    protected OutputCollector      collector;
-    protected ProducerConfig       producerConfig;
-    protected Producer             producer;
-    protected RunCounter           runCounter;
-    
-    public void prepare(Map stormConf, TopologyContext context,
-            OutputCollector collector) {
-        this.collector = collector;
-        this.runCounter = new RunCounter(ProducerBolt.class);
-        this.producerConfig = new ProducerConfig(stormConf);
-        try {
-			this.producer = ProducerFactory.mkInstance(producerConfig);
-		} catch (Exception e) {
-			// TODO Auto-generated catch block
-			throw new RuntimeException(e);
-		}
-        
-    }
-    
-    public void execute(Tuple tuple) {
-        // TODO Auto-generated method stub
-        OnsTuple msgTuple = (OnsTuple)tuple.getValue(0);
-        long before = System.currentTimeMillis();
-        SendResult sendResult = null;
-        try {
-        	Message msg = new Message(
-        			producerConfig.getTopic(),
-        			producerConfig.getSubExpress(),
-        			//Message Body
-        			//�κζ�������ʽ�����ݣ�ONS�����κθ�Ԥ����ҪProducer��ConsumerЭ�̺�һ�µ����л��ͷ����л���ʽ
-        			msgTuple.getMessage().getBody());
-        	
-        	// ���ô�����Ϣ��ҵ��ؼ����ԣ��뾡����ȫ��Ψһ��
-        	// �Է��������޷������յ���Ϣ����£���ͨ��ONS Console��ѯ��Ϣ��������
-        	// ע�⣺������Ҳ����Ӱ����Ϣ�����շ�
-        	if (msgTuple.getMessage().getKey() != null) {
-        		msg.setKey(msgTuple.getMessage().getKey());
-        	}
-        	//������Ϣ��ֻҪ�����쳣���dzɹ�
-        	sendResult = producer.send(msg);
-        	
-            LOG.info("Success send msg of " + msgTuple.getMessage().getMsgID());
-        	runCounter.count(System.currentTimeMillis() - before);
-        } catch (Exception e) {
-        	LOG.error("Failed to send message, SendResult:" + sendResult + "\n", e);
-        	runCounter.count(System.currentTimeMillis() - before);
-            collector.fail(tuple);
-            return ;
-            //throw new FailedException(e);
-        }
-        
-        collector.ack(tuple);
-    }
-    
-    public void cleanup() {
-        // TODO Auto-generated method stub
-    	ProducerFactory.rmInstance(producerConfig.getProducerId());
-    	producer = null;
-    }
-    
-    public void declareOutputFields(OutputFieldsDeclarer declarer) {
-        // TODO Auto-generated method stub
-        
-    }
-    
-    public Map<String, Object> getComponentConfiguration() {
-        // TODO Auto-generated method stub
-        return null;
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e8f64d5e/jstorm-utility/ons/src/main/java/com/alibaba/jstorm/ons/producer/ProducerConfig.java
----------------------------------------------------------------------
diff --git a/jstorm-utility/ons/src/main/java/com/alibaba/jstorm/ons/producer/ProducerConfig.java b/jstorm-utility/ons/src/main/java/com/alibaba/jstorm/ons/producer/ProducerConfig.java
deleted file mode 100644
index 3ac7fb6..0000000
--- a/jstorm-utility/ons/src/main/java/com/alibaba/jstorm/ons/producer/ProducerConfig.java
+++ /dev/null
@@ -1,29 +0,0 @@
-package com.alibaba.jstorm.ons.producer;
-
-import java.util.Map;
-
-import com.alibaba.jstorm.ons.OnsConfig;
-import com.aliyun.openservices.ons.api.PropertyKeyConst;
-
-public class ProducerConfig extends OnsConfig{
-
-	private static final long serialVersionUID = 1532254745626913230L;
-
-	private final String producerId ;
-	
-	public ProducerConfig(Map conf) {
-		super(conf);
-		
-		producerId = (String)conf.get(PropertyKeyConst.ProducerId);
-		if (producerId == null) {
-			throw new RuntimeException(PropertyKeyConst.ProducerId + " hasn't been set");
-		}
-		
-		
-	}
-
-	public String getProducerId() {
-		return producerId;
-	}
-	
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e8f64d5e/jstorm-utility/ons/src/main/java/com/alibaba/jstorm/ons/producer/ProducerFactory.java
----------------------------------------------------------------------
diff --git a/jstorm-utility/ons/src/main/java/com/alibaba/jstorm/ons/producer/ProducerFactory.java b/jstorm-utility/ons/src/main/java/com/alibaba/jstorm/ons/producer/ProducerFactory.java
deleted file mode 100644
index 203805d..0000000
--- a/jstorm-utility/ons/src/main/java/com/alibaba/jstorm/ons/producer/ProducerFactory.java
+++ /dev/null
@@ -1,59 +0,0 @@
-package com.alibaba.jstorm.ons.producer;
-
-import com.aliyun.openservices.ons.api.Consumer;
-import com.aliyun.openservices.ons.api.MessageListener;
-import com.aliyun.openservices.ons.api.ONSFactory;
-import com.aliyun.openservices.ons.api.Producer;
-import com.aliyun.openservices.ons.api.PropertyKeyConst;
-import org.apache.log4j.Logger;
-
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Properties;
-
-public class ProducerFactory {
-
-	private static final Logger LOG = Logger.getLogger(ProducerFactory.class);
-
-	public static Map<String, Producer> producers = new HashMap<String, Producer>();
-
-	public static synchronized Producer mkInstance(ProducerConfig producerConfig) throws Exception{
-
-		String producerId = producerConfig.getProducerId();
-		Producer producer = producers.get(producerId);
-		if (producer != null) {
-
-			LOG.info("Producer of " + producerId + " has been created, don't recreate it ");
-			return producer;
-		}
-		
-		Properties properties = new Properties();
-		properties.put(PropertyKeyConst.ProducerId,  producerConfig.getProducerId());
-		properties.put(PropertyKeyConst.AccessKey, producerConfig.getAccessKey());
-		properties.put(PropertyKeyConst.SecretKey, producerConfig.getSecretKey());
-		
-		producer = ONSFactory.createProducer(properties);
-		producer.start();
-
-
-		producers.put(producerId, producer);
-		LOG.info("Successfully create " + producerId + " producer");
-
-		return producer;
-
-	}
-	
-	public static synchronized void rmInstance(String producerId) {
-		Producer producer = producers.remove(producerId);
-		if (producer == null) {
-
-			LOG.info("Producer of " + producerId + " has already been shutdown ");
-			return ;
-		}
-		
-		producer.shutdown();
-		LOG.info("Producer of " + producerId + " has been shutdown ");
-		return ;
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e8f64d5e/jstorm-utility/ons/test/main/resources/metaspout.yaml
----------------------------------------------------------------------
diff --git a/jstorm-utility/ons/test/main/resources/metaspout.yaml b/jstorm-utility/ons/test/main/resources/metaspout.yaml
deleted file mode 100755
index f007772..0000000
--- a/jstorm-utility/ons/test/main/resources/metaspout.yaml
+++ /dev/null
@@ -1,32 +0,0 @@
-
-#Meta Client Configuration
-# Please refer MetaClientConfig for every setting's details
-meta.topic: "bbl_user"
-meta.consumer.group: "bbl_user"
-meta.subexpress: "*"
-#meta.nameserver: ""
-#meta.pull.interval.ms: 0
-#meta.max.fail.times: 5
-#meta.internal.queue.size: 256
-#meta.batch.send.msg.size: 16
-#meta.batch.pull.msg.size: 32
-#meta.pull.thread.num: 4
-#meta.spout.auto.ack: false
-#meta.spout.flow.contro: true
-#yyyyMMddHHmmss
-meta.consumer.start.timestamp: "20141011000000"
-#meta.extra.properties: 
-
-topology.name: test_meta_spout
-topology.version: 1.0.0
-topology.workers: 5
-topology.max.spout.pending: 10
-topology.acker.executors: 1
-
-topology.debug: false
-topology.debug.recv.tuple: false
-storm.cluster.mode: local
-
-topology.spout.parallel: 2
-topology.writer.parallel: 1
-

http://git-wip-us.apache.org/repos/asf/storm/blob/e8f64d5e/jstorm-utility/rocket-mq
----------------------------------------------------------------------
diff --git a/jstorm-utility/rocket-mq b/jstorm-utility/rocket-mq
deleted file mode 160000
index 372e9d8..0000000
--- a/jstorm-utility/rocket-mq
+++ /dev/null
@@ -1 +0,0 @@
-Subproject commit 372e9d87667272e6da7b5501d6d7dd2bad41ce6f

http://git-wip-us.apache.org/repos/asf/storm/blob/e8f64d5e/jstorm-utility/topology-monitor/.gitignore
----------------------------------------------------------------------
diff --git a/jstorm-utility/topology-monitor/.gitignore b/jstorm-utility/topology-monitor/.gitignore
deleted file mode 100755
index 2bf102a..0000000
--- a/jstorm-utility/topology-monitor/.gitignore
+++ /dev/null
@@ -1,13 +0,0 @@
-# Lines that start with '#' are comments.
-*~
-*.diff
-*#
-.classpath
-.project
-.settings
-bin
-*.class
-.eclipse
-target
-*.iml
-*.versionsBackup

http://git-wip-us.apache.org/repos/asf/storm/blob/e8f64d5e/jstorm-utility/topology-monitor/README.md
----------------------------------------------------------------------
diff --git a/jstorm-utility/topology-monitor/README.md b/jstorm-utility/topology-monitor/README.md
deleted file mode 100755
index ad4f892..0000000
--- a/jstorm-utility/topology-monitor/README.md
+++ /dev/null
@@ -1,2 +0,0 @@
-storm-util
-==========

http://git-wip-us.apache.org/repos/asf/storm/blob/e8f64d5e/jstorm-utility/topology-monitor/pom.xml
----------------------------------------------------------------------
diff --git a/jstorm-utility/topology-monitor/pom.xml b/jstorm-utility/topology-monitor/pom.xml
deleted file mode 100755
index 714b3e3..0000000
--- a/jstorm-utility/topology-monitor/pom.xml
+++ /dev/null
@@ -1,110 +0,0 @@
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
-	<modelVersion>4.0.0</modelVersion>
-	<groupId>com.dianping.cosmos</groupId>
-	<artifactId>storm-util</artifactId>
-	<packaging>jar</packaging>
-	<version>1.3-SNAPSHOT</version>
-	<name>storm-util</name>
-	<url>http://maven.apache.org</url>
-	<dependencies>
-		<dependency>
-			<groupId>com.dianping</groupId>
-			<artifactId>blackhole-consumer</artifactId>
-			<version>2.0.5</version>
-		</dependency>
-		<dependency>
-			<groupId>com.dianping</groupId>
-			<artifactId>blackhole-common</artifactId>
-			<version>2.0.5</version>
-		</dependency>
-		<dependency>
-			<groupId>redis.clients</groupId>
-			<artifactId>jedis</artifactId>
-			<version>2.4.2</version>
-		</dependency>
-		<dependency>
-			<groupId>org.apache.storm</groupId>
-			<artifactId>storm-core</artifactId>
-			<version>0.9.1-incubating</version>
-			 <exclusions>
-				<exclusion>
-					 <groupId>ch.qos.logback</groupId>  
-					 <artifactId>logback-classic</artifactId>  
-				</exclusion>
-				<exclusion>
-				     <groupId>org.slf4j</groupId>
-					 <artifactId>log4j-over-slf4j</artifactId>
-				</exclusion>
-			</exclusions>
-			<scope>provided</scope>
-		</dependency>
-		<dependency>
-			<groupId>com.dianping.puma</groupId>
-			<artifactId>puma-client</artifactId>
-			<version>0.1.1</version>
-		</dependency>
-		<dependency>
-			<groupId>log4j</groupId>
-			<artifactId>log4j</artifactId>
-			<version>1.2.14</version>
-			<scope>provided</scope>
-		</dependency>
-		<dependency>
-			<groupId>org.slf4j</groupId>
-			<artifactId>slf4j-log4j12</artifactId>
-			<version>1.5.11</version>
-		</dependency>
-		<dependency>
-			<groupId>org.slf4j</groupId>
-			<artifactId>slf4j-api</artifactId>
-			<version>1.5.11</version>
-		</dependency>
-		<dependency>
-			<groupId>junit</groupId>
-			<artifactId>junit</artifactId>
-			<version>3.8.1</version>
-			<scope>test</scope>
-		</dependency>
-		<dependency>
-			<groupId>com.dianping.cat</groupId>
-			<artifactId>cat-core</artifactId>
-			<version>1.0.5</version>
-		</dependency>
-	</dependencies>
-	<build>
-		<plugins>
-			<plugin>
-				<groupId>org.apache.maven.plugins</groupId>
-				<artifactId>maven-source-plugin</artifactId>
-				<version>2.2</version>
-				<executions>
-					<execution>
-						<goals>
-							<goal>jar-no-fork</goal>
-						</goals>
-					</execution>
-				</executions>
-			</plugin>
-		</plugins>
-	</build>
-	<distributionManagement>
-		<repository>
-			<id>dianping.repo</id>
-			<name>Dian Ping internal repository for released artifacts</name>
-			<url>http://mvn.dianpingoa.com/dianping-releases</url>
-		</repository>
-		<snapshotRepository>
-			<id>dianping.repo.snapshots</id>
-			<name>mvn.dianpingoa.com-snapshots</name>
-			<url>http://mvn.dianpingoa.com/dianping-snapshots</url>
-		</snapshotRepository>
-	</distributionManagement>
-<repositories>
-    <repository>
-      <id>clojars</id>
-      <url>http://clojars.org/repo/</url>
-    </repository>
-</repositories>
-
-</project>

http://git-wip-us.apache.org/repos/asf/storm/blob/e8f64d5e/jstorm-utility/topology-monitor/src/main/java/com/dianping/cosmos/BlackholeBlockingQueueSpout.java
----------------------------------------------------------------------
diff --git a/jstorm-utility/topology-monitor/src/main/java/com/dianping/cosmos/BlackholeBlockingQueueSpout.java b/jstorm-utility/topology-monitor/src/main/java/com/dianping/cosmos/BlackholeBlockingQueueSpout.java
deleted file mode 100755
index 50e870a..0000000
--- a/jstorm-utility/topology-monitor/src/main/java/com/dianping/cosmos/BlackholeBlockingQueueSpout.java
+++ /dev/null
@@ -1,114 +0,0 @@
-package com.dianping.cosmos;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import backtype.storm.metric.api.CountMetric;
-import backtype.storm.spout.SpoutOutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.IRichSpout;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.tuple.Fields;
-import backtype.storm.tuple.Values;
-import backtype.storm.utils.Utils;
-
-import com.dianping.cosmos.util.CatMetricUtil;
-import com.dianping.cosmos.util.Constants;
-import com.dianping.lion.client.LionException;
-import com.dp.blackhole.consumer.Consumer;
-import com.dp.blackhole.consumer.ConsumerConfig;
-import com.dp.blackhole.consumer.MessageStream;
-
-@SuppressWarnings({"rawtypes", "unchecked"})
-public class BlackholeBlockingQueueSpout implements IRichSpout {
-    private static final long serialVersionUID = 386827585122587595L;
-    public static final Logger LOG = LoggerFactory.getLogger(BlackholeBlockingQueueSpout.class);
-    private SpoutOutputCollector collector;
-    private String topic;
-    private String group;
-    private MessageStream stream;
-    private Consumer consumer;
-    private MessageFetcher fetchThread;
-    private int warnningStep = 0;
-    private transient CountMetric _spoutMetric;
-
-    public BlackholeBlockingQueueSpout(String topic, String group) {
-        this.topic = topic;
-        this.group = group;
-    }
-    
-    @Override
-    public void open(Map conf, TopologyContext context,
-            SpoutOutputCollector _collector) {
-        collector = _collector;
-        _spoutMetric = new CountMetric();
-        context.registerMetric(CatMetricUtil.getSpoutMetricName(topic, group),  
-                _spoutMetric, Constants.EMIT_FREQUENCY_IN_SECONDS);
-        
-        ConsumerConfig config = new ConsumerConfig();
-        try {
-            consumer = new Consumer(topic, group, config);
-        } catch (LionException e) {
-            throw new RuntimeException(e);
-        }
-        consumer.start();
-        stream = consumer.getStream();
-        
-        fetchThread = new MessageFetcher(stream);
-        new Thread(fetchThread).start();
-    }
-
-    @Override
-    public void close() {
-        fetchThread.shutdown();
-    }
-
-    @Override
-    public void activate() {
-        
-    }
-
-    @Override
-    public void deactivate() {        
-    }
-
-    @Override
-    public void nextTuple() {
-        String message = fetchThread.pollMessage();
-        if (message != null) {
-            collector.emit(topic, new Values(message));
-            _spoutMetric.incr();
-        } else {
-            Utils.sleep(100);
-            warnningStep++;
-            if (warnningStep % 100 == 0) {
-                LOG.warn("Queue is empty, cannot poll message.");
-            }
-        }
-    }
-        
-    @Override
-    public void ack(Object msgId) {
-        LOG.debug("ack: " + msgId);
-        
-    }
-
-    @Override
-    public void fail(Object msgId) {
-        LOG.info("fail: " + msgId);   
-    }
-
-    @Override
-    public void declareOutputFields(OutputFieldsDeclarer declarer) {
-        declarer.declareStream(topic, new Fields("event"));
-    }
-
-    @Override
-    public Map getComponentConfiguration(){
-         Map<String, Object> conf = new HashMap<String, Object>();
-         return conf;
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e8f64d5e/jstorm-utility/topology-monitor/src/main/java/com/dianping/cosmos/BlackholeSpout.java
----------------------------------------------------------------------
diff --git a/jstorm-utility/topology-monitor/src/main/java/com/dianping/cosmos/BlackholeSpout.java b/jstorm-utility/topology-monitor/src/main/java/com/dianping/cosmos/BlackholeSpout.java
deleted file mode 100755
index 86dfffb..0000000
--- a/jstorm-utility/topology-monitor/src/main/java/com/dianping/cosmos/BlackholeSpout.java
+++ /dev/null
@@ -1,101 +0,0 @@
-package com.dianping.cosmos;
-
-import java.util.Map;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import backtype.storm.metric.api.CountMetric;
-import backtype.storm.spout.SpoutOutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.IRichSpout;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.tuple.Fields;
-import backtype.storm.tuple.Values;
-
-import com.dianping.cosmos.util.CatMetricUtil;
-import com.dianping.cosmos.util.Constants;
-import com.dianping.lion.client.LionException;
-import com.dp.blackhole.consumer.Consumer;
-import com.dp.blackhole.consumer.ConsumerConfig;
-import com.dp.blackhole.consumer.MessageStream;
-
-@SuppressWarnings({"rawtypes"})
-public class BlackholeSpout implements IRichSpout {
-    private static final long serialVersionUID = 1L;
-
-    public static final Logger LOG = LoggerFactory.getLogger(BlackholeSpout.class);
-    
-    private SpoutOutputCollector collector;
-    private String topic;
-    private String group;
-    private MessageStream stream;
-    private Consumer consumer;
-    private transient CountMetric _spoutMetric;
-
-    public BlackholeSpout(String topic, String group) {
-        this.topic = topic;
-        this.group = group;
-    }
-    
-    @Override
-    public void open(Map conf, TopologyContext context,
-            SpoutOutputCollector _collector) {
-        collector = _collector;
-        _spoutMetric = new CountMetric();
-        context.registerMetric(CatMetricUtil.getSpoutMetricName(topic, group),  
-                _spoutMetric, Constants.EMIT_FREQUENCY_IN_SECONDS);
-        
-        ConsumerConfig config = new ConsumerConfig();
-        try {
-            consumer = new Consumer(topic, group, config);
-        } catch (LionException e) {
-            throw new RuntimeException(e);
-        }
-        consumer.start();
-        stream = consumer.getStream();
-    }
-
-    @Override
-    public void close() {        
-    }
-
-    @Override
-    public void activate() {        
-    }
-
-    @Override
-    public void deactivate() {        
-    }
-
-    @Override
-    public void nextTuple() {
-        for (String message : stream) {
-            collector.emit(topic, new Values(message));
-            _spoutMetric.incr();
-        }
-    }
-
-    @Override
-    public void ack(Object msgId) {
-        LOG.debug("ack: " + msgId);
-        
-    }
-
-    @Override
-    public void fail(Object msgId) {
-        LOG.info("fail: " + msgId);   
-    }
-
-    @Override
-    public void declareOutputFields(OutputFieldsDeclarer declarer) {
-        declarer.declareStream(topic, new Fields("event"));
-    }
-
-    @Override
-    public Map<String, Object> getComponentConfiguration() {
-        // TODO Auto-generated method stub
-        return null;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e8f64d5e/jstorm-utility/topology-monitor/src/main/java/com/dianping/cosmos/MessageFetcher.java
----------------------------------------------------------------------
diff --git a/jstorm-utility/topology-monitor/src/main/java/com/dianping/cosmos/MessageFetcher.java b/jstorm-utility/topology-monitor/src/main/java/com/dianping/cosmos/MessageFetcher.java
deleted file mode 100755
index b379dd3..0000000
--- a/jstorm-utility/topology-monitor/src/main/java/com/dianping/cosmos/MessageFetcher.java
+++ /dev/null
@@ -1,50 +0,0 @@
-package com.dianping.cosmos;
-
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.dp.blackhole.consumer.MessageStream;
-
-public class MessageFetcher implements Runnable {
-    public static final Logger LOG = LoggerFactory.getLogger(MessageFetcher.class);
-    private final int MAX_QUEUE_SIZE = 1000;
-    private final int TIME_OUT = 5000;
-
-    private BlockingQueue<String> emitQueue;
-    private MessageStream stream;
-
-    private volatile boolean running;
-    public MessageFetcher(MessageStream stream) {
-        this.running = true;
-        this.stream = stream;
-        this.emitQueue = new LinkedBlockingQueue<String>(MAX_QUEUE_SIZE);
-    }
-    
-    @Override
-    public void run() {
-        while (running) {
-            for (String message : stream) {
-                try {
-                    while(!emitQueue.offer(message, TIME_OUT, TimeUnit.MILLISECONDS)) {
-                        LOG.error("Queue is full, cannot offer message.");
-                    }
-                } catch (InterruptedException e) {
-                    LOG.error("Thread Interrupted");
-                    running = false;
-                }
-            }
-        }
-    }
-    
-    public String pollMessage() {
-        return emitQueue.poll();
-    }
-    
-    public void shutdown() {
-        this.running = false;
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e8f64d5e/jstorm-utility/topology-monitor/src/main/java/com/dianping/cosmos/PumaSpout.java
----------------------------------------------------------------------
diff --git a/jstorm-utility/topology-monitor/src/main/java/com/dianping/cosmos/PumaSpout.java b/jstorm-utility/topology-monitor/src/main/java/com/dianping/cosmos/PumaSpout.java
deleted file mode 100755
index fb1ff2a..0000000
--- a/jstorm-utility/topology-monitor/src/main/java/com/dianping/cosmos/PumaSpout.java
+++ /dev/null
@@ -1,194 +0,0 @@
-package com.dianping.cosmos;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.LinkedBlockingQueue;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import backtype.storm.spout.SpoutOutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.IRichSpout;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.tuple.Fields;
-import backtype.storm.tuple.Values;
-
-import com.dianping.puma.api.ConfigurationBuilder;
-import com.dianping.puma.api.EventListener;
-import com.dianping.puma.api.PumaClient;
-import com.dianping.puma.core.event.ChangedEvent;
-import com.dianping.puma.core.event.RowChangedEvent;
-
-
-public class PumaSpout implements IRichSpout{
-    public static final Logger LOG = LoggerFactory.getLogger(PumaSpout.class);
-    
-    private SpoutOutputCollector collector;
-    private PumaEventListener listener;
-    private BlockingQueue<RowChangedEvent> receiveQueue;
-    private Map<String, RowChangedEvent> waitingForAck;
-    
-    private Map<String, String[]> watchTables;
-    private String pumaHost;
-    private int pumaPort;
-    private String pumaName;
-    private String pumaTarget;
-    private int pumaServerId;
-    private String pumaSeqFileBase;
-    
-    public PumaSpout(String host, int port, String name, String target, HashMap<String, String[]> tables) {
-        this(host, port, name, target, tables, null);
-    }
-    
-    public PumaSpout(String host, int port, String name, String target, HashMap<String, String[]> tables, String seqFileBase) {
-        this(host, port, name, target, tables, 9999, seqFileBase);
-    }
-    
-    public PumaSpout(String host, int port, String name, String target, HashMap<String, String[]> tables, int serverId, String seqFileBase) {
-        pumaHost = host;
-        pumaPort = port;
-        pumaName = name;
-        pumaTarget = target;
-        watchTables = tables;
-        pumaServerId = serverId;
-        pumaSeqFileBase = seqFileBase;
-    }
-    
-    protected static String getMsgId(RowChangedEvent e) {
-        return e.getBinlogServerId() + "." + e.getBinlog() + "." + e.getBinlogPos();
-    }
-    
-    protected static String getStreamId(RowChangedEvent e) {
-        return e.getDatabase() + "." + e.getTable();
-    }
-    
-    class PumaEventListener implements EventListener {
-
-        @Override
-        public void onEvent(ChangedEvent event) throws Exception {
-            if (!(event instanceof RowChangedEvent)) {
-                LOG.error("received event " + event +" which is not a RowChangedEvent");
-                return;
-            }
-            RowChangedEvent e = (RowChangedEvent)event;
-            receiveQueue.add(e);
-        }
-
-        @Override
-        public boolean onException(ChangedEvent event, Exception e) {
-            return false;
-        }
-
-        @Override
-        public void onConnectException(Exception e) {
-            // TODO Auto-generated method stub
-            
-        }
-
-        @Override
-        public void onConnected() {
-            LOG.info("pumaspout connected");
-        }
-
-        @Override
-        public void onSkipEvent(ChangedEvent event) {
-            // TODO Auto-generated method stub
-            
-        }
-        
-    }
-    
-    @Override
-    public void ack(Object msgId) {
-        LOG.debug("ack: " + msgId);
-        waitingForAck.remove(msgId);
-    }
-
-    @Override
-    public void activate() {
-    }
-
-    @Override
-    public void close() {
-        // TODO Auto-generated method stub
-        
-    }
-
-    @Override
-    public void deactivate() {
-        // TODO Auto-generated method stub
-        
-    }
-
-    @Override
-    public void fail(Object msgId) {
-        LOG.debug("fail: " + msgId + ", resend event");
-        RowChangedEvent event = waitingForAck.get(msgId);
-        collector.emit(getStreamId(event), new Values(event), getMsgId(event));
-    }
-
-    @Override
-    public void nextTuple() {
-        RowChangedEvent event = null;
-        try {
-            event = receiveQueue.take();
-        } catch (InterruptedException e) {
-            return;
-        }
-        
-        String msgId = getMsgId(event);
-        collector.emit(getStreamId(event), new Values(event), msgId);
-        waitingForAck.put(msgId, event);
-    }
-
-    @Override
-    public void open(Map conf, TopologyContext context, SpoutOutputCollector _collector) {
-        collector = _collector;
-        receiveQueue = new LinkedBlockingQueue<RowChangedEvent>();
-        waitingForAck = new ConcurrentHashMap<String, RowChangedEvent>();
-        
-        ConfigurationBuilder configBuilder = new ConfigurationBuilder();
-        configBuilder.ddl(false);
-        configBuilder.dml(true);
-        configBuilder.transaction(false);
-        if (pumaSeqFileBase != null) {
-            configBuilder.seqFileBase(pumaSeqFileBase);
-        }
-        configBuilder.host(pumaHost);
-        configBuilder.port(pumaPort);
-        configBuilder.serverId(pumaServerId);
-        configBuilder.name(pumaName);
-        for (Entry<String, String[]> e : watchTables.entrySet()) {
-            String db = e.getKey();
-            String[] tabs = e.getValue();
-            configBuilder.tables(db, tabs);
-        }
-        configBuilder.target(pumaTarget);     
-        PumaClient pc = new PumaClient(configBuilder.build());
-        
-        listener = new PumaEventListener();
-        pc.register(listener);
-        pc.start();
-    }
-
-    @Override
-    public void declareOutputFields(OutputFieldsDeclarer declarer) {
-        for (Entry<String, String[]> entry : watchTables.entrySet()) {
-            String db = entry.getKey();
-            for (String table : entry.getValue()) {
-                String dbTable = db + "." + table;
-                declarer.declareStream(dbTable, new Fields("event"));
-            }
-        }
-    }
-
-    @Override
-    public Map<String, Object> getComponentConfiguration() {
-        // TODO Auto-generated method stub
-        return null;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e8f64d5e/jstorm-utility/topology-monitor/src/main/java/com/dianping/cosmos/RedisSinkBolt.java
----------------------------------------------------------------------
diff --git a/jstorm-utility/topology-monitor/src/main/java/com/dianping/cosmos/RedisSinkBolt.java b/jstorm-utility/topology-monitor/src/main/java/com/dianping/cosmos/RedisSinkBolt.java
deleted file mode 100755
index d2e02e1..0000000
--- a/jstorm-utility/topology-monitor/src/main/java/com/dianping/cosmos/RedisSinkBolt.java
+++ /dev/null
@@ -1,167 +0,0 @@
-package com.dianping.cosmos;
-
-import java.util.Map;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
-
-import redis.clients.jedis.Jedis;
-import redis.clients.jedis.JedisPool;
-import redis.clients.jedis.exceptions.JedisConnectionException;
-import backtype.storm.task.OutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.IRichBolt;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.tuple.Tuple;
-
-public class RedisSinkBolt implements IRichBolt {
-    private final Log LOG = LogFactory.getLog(RedisSinkBolt.class);
-    private OutputCollector collector;
-    private JedisPool pool;
-    private Updater updater;
-    
-    private String redisHost;
-    private int redisPort;
-    private int timeout;
-    private int retryLimit;
-    
-    public RedisSinkBolt(String redisHost, int redisPort) {
-        this(redisHost, redisPort, 50, 3);
-    }
-    
-    public RedisSinkBolt(String redisHost, int redisPort, int retryLimit) {
-        this(redisHost, redisPort, 50, retryLimit);
-    }
-    
-    public RedisSinkBolt(String redisHost, int redisPort, int timeout, int retryLimit) {
-        this.redisHost = redisHost;
-        this.redisPort = redisPort;
-        this.timeout = timeout;
-        this.retryLimit = retryLimit;
-    }
-    
-    public void setUpdater(Updater updater) {
-        this.updater = updater;
-    }
-    
-    @Override
-    public void prepare(Map conf, TopologyContext context,
-            OutputCollector collector) {
-        this.collector = collector;
-        
-        GenericObjectPoolConfig pconf = new GenericObjectPoolConfig();
-        pconf.setMaxWaitMillis(2000);
-        pconf.setMaxTotal(1000);
-        pconf.setTestOnBorrow(false);
-        pconf.setTestOnReturn(false);
-        pconf.setTestWhileIdle(true);
-        pconf.setMinEvictableIdleTimeMillis(120000);
-        pconf.setTimeBetweenEvictionRunsMillis(60000);
-        pconf.setNumTestsPerEvictionRun(-1);
-        
-        pool = new JedisPool(pconf, redisHost, redisPort, timeout);
-    }
-
-    private byte[] retryGet(byte[] key) {
-        int retry = 0;
-        byte[] ret;
-        while (true) {
-            Jedis jedis = null;
-            try {
-                jedis = pool.getResource();
-                ret = jedis.get(key);
-                return ret;
-            } catch (JedisConnectionException e) {
-                if (jedis != null) {
-                    pool.returnBrokenResource(jedis);
-                    jedis = null;
-                }
-                if (retry > retryLimit) {
-                    throw e;
-                }
-                retry++;
-            } finally {
-                if (jedis != null) {
-                    pool.returnResource(jedis);
-                }
-            }
-        }
-    }
-    
-    private String retrySet(byte[] key, byte[] value) {
-        int retry = 0;
-        String ret;
-        while (true) {
-            Jedis jedis = null;
-            try {
-                jedis = pool.getResource();
-                ret = jedis.set(key, value);
-                return ret;
-            } catch (JedisConnectionException e) {
-                if (jedis != null) {
-                    pool.returnBrokenResource(jedis);
-                    jedis = null;
-                }
-                if (retry > retryLimit) {
-                    throw e;
-                }
-                retry++;
-            } finally {
-                if (jedis != null) {
-                    pool.returnResource(jedis);
-                }
-            }
-            
-        }
-    }
-    
-    @Override
-    public void execute(Tuple input) {
-        byte[] key = input.getBinary(0);
-        byte[] value = input.getBinary(1);
-        
-        if (key == null || value == null) {
-            collector.ack(input);
-            return;
-        }
-        
-        try {
-            if (updater != null) {
-                byte[] oldValue = retryGet(key);
-                byte[] newValue = updater.update(oldValue, value);
-                if (newValue == null) {
-                    collector.ack(input);
-                    return;
-                }
-                retrySet(key, newValue);
-                collector.ack(input);
-                return;
-            }
-            
-            retrySet(key, value);
-            collector.ack(input);
-        } catch (JedisConnectionException e) {
-            LOG.warn("JedisConnectionException catched ", e);
-            collector.fail(input);
-        }
-    }
-
-    @Override
-    public void cleanup() {
-        pool.destroy();
-    }
-
-    @Override
-    public void declareOutputFields(OutputFieldsDeclarer declarer) {
-        // TODO Auto-generated method stub
-        
-    }
-
-    @Override
-    public Map<String, Object> getComponentConfiguration() {
-        // TODO Auto-generated method stub
-        return null;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e8f64d5e/jstorm-utility/topology-monitor/src/main/java/com/dianping/cosmos/Updater.java
----------------------------------------------------------------------
diff --git a/jstorm-utility/topology-monitor/src/main/java/com/dianping/cosmos/Updater.java b/jstorm-utility/topology-monitor/src/main/java/com/dianping/cosmos/Updater.java
deleted file mode 100755
index 04eedf2..0000000
--- a/jstorm-utility/topology-monitor/src/main/java/com/dianping/cosmos/Updater.java
+++ /dev/null
@@ -1,9 +0,0 @@
-package com.dianping.cosmos;
-
-import java.io.Serializable;
-
-public interface Updater extends Serializable {
-
-    byte[] update(byte[] oldValue, byte[] newValue);
-
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e8f64d5e/jstorm-utility/topology-monitor/src/main/java/com/dianping/cosmos/metric/CatMetricsConsumer.java
----------------------------------------------------------------------
diff --git a/jstorm-utility/topology-monitor/src/main/java/com/dianping/cosmos/metric/CatMetricsConsumer.java b/jstorm-utility/topology-monitor/src/main/java/com/dianping/cosmos/metric/CatMetricsConsumer.java
deleted file mode 100755
index 1e2ff06..0000000
--- a/jstorm-utility/topology-monitor/src/main/java/com/dianping/cosmos/metric/CatMetricsConsumer.java
+++ /dev/null
@@ -1,70 +0,0 @@
-package com.dianping.cosmos.metric;
-
-import java.util.Collection;
-import java.util.Map;
-
-import org.apache.commons.lang.StringUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import backtype.storm.metric.api.IMetricsConsumer;
-import backtype.storm.task.IErrorReporter;
-import backtype.storm.task.TopologyContext;
-
-import com.dianping.cosmos.monitor.HttpCatClient;
-import com.dianping.cosmos.util.CatMetricUtil;
-
-/**
- * Listens for all metrics, dumps them to cat
- *
- * To use, add this to your topology's configuration:
- *   conf.registerMetricsConsumer(com.dianping.cosmos.metric.CatSpoutMetricsConsumer.class, 1);
- *
- * Or edit the storm.yaml config file:
- *
- *   topology.metrics.consumer.register:
- *     - class: "com.dianping.cosmos.metric.CatSpoutMetricsConsumer"
- *       parallelism.hint: 1
- *
- */
-@SuppressWarnings("rawtypes")
-public class CatMetricsConsumer implements IMetricsConsumer {
-    private static final Logger LOGGER = LoggerFactory.getLogger(CatMetricsConsumer.class);
-    private String stormId;
-
-    @Override
-    public void prepare(Map stormConf, Object registrationArgument, 
-            TopologyContext context, IErrorReporter errorReporter) {
-        stormId = context.getStormId();
-    }
-
-
-    @Override
-    public void handleDataPoints(TaskInfo taskInfo, Collection<DataPoint> dataPoints) {
-        for (DataPoint p : dataPoints) {
-            try{
-                if(CatMetricUtil.isCatMetric(p.name)){
-                    HttpCatClient.sendMetric(getTopologyName(), 
-                            CatMetricUtil.getCatMetricKey(p.name), "sum", String.valueOf(p.value));
-                }
-            }
-            catch(Exception e){
-                LOGGER.warn("send metirc 2 cat error.", e);
-            }
-        }
-    }
-    
-    private String getTopologyName(){
-       return StringUtils.substringBefore(stormId, "-");
-    }
-
-    @Override
-    public void cleanup() { 
-    }
-    
-    public static void main(String[] args){
-        CatMetricsConsumer c = new CatMetricsConsumer();
-        c.stormId = "HippoUV_25-15-1410857734";
-        System.out.println(c.getTopologyName());
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e8f64d5e/jstorm-utility/topology-monitor/src/main/java/com/dianping/cosmos/monitor/HttpCatClient.java
----------------------------------------------------------------------
diff --git a/jstorm-utility/topology-monitor/src/main/java/com/dianping/cosmos/monitor/HttpCatClient.java b/jstorm-utility/topology-monitor/src/main/java/com/dianping/cosmos/monitor/HttpCatClient.java
deleted file mode 100755
index 9fe6987..0000000
--- a/jstorm-utility/topology-monitor/src/main/java/com/dianping/cosmos/monitor/HttpCatClient.java
+++ /dev/null
@@ -1,57 +0,0 @@
-package com.dianping.cosmos.monitor;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class HttpCatClient {
-    private static final Logger LOGGER = LoggerFactory.getLogger(HttpClientService.class);
-    
-    private HttpCatClient(){
-    }
-
-    private static HttpClientService httClientSerivce = new HttpClientService();
-        
-    private static List<String> CAT_SERVERS = new ArrayList<String>();
-    //初始化访问的server的地址
-    private static AtomicInteger CURRENT_SERVER_INDEX = new AtomicInteger(0);
-    
-    static{
-        CAT_SERVERS.add("http://cat02.nh:8080/");
-        CAT_SERVERS.add("http://cat03.nh:8080/");
-        CAT_SERVERS.add("http://cat04.nh:8080/");
-        CAT_SERVERS.add("http://cat05.nh:8080/");
-        CAT_SERVERS.add("http://cat06.nh:8080/");
-    }
-    
-    public static void sendMetric(String domain, String key, String op, String value){
-        String server = getServer();
-        try{
-            StringBuilder request = new StringBuilder();
-            request.append(server);
-            request.append("cat/r/monitor?timestamp=");
-            request.append(System.currentTimeMillis());
-            request.append("&group=Storm&domain=");
-            request.append(domain);
-            request.append("&key=");
-            request.append(key);
-            request.append("&op=");
-            request.append(op);
-            request.append("&" + op +"=");
-            request.append(value);
-            httClientSerivce.get(request.toString());
-        }
-        catch(Exception e){
-            CURRENT_SERVER_INDEX.getAndIncrement();
-            LOGGER.error("send to cat " + server + " error.",  e);
-        }
-    }
-    
-    private static String getServer(){
-        int index = CURRENT_SERVER_INDEX.get() % CAT_SERVERS.size();
-        return CAT_SERVERS.get(index);
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e8f64d5e/jstorm-utility/topology-monitor/src/main/java/com/dianping/cosmos/monitor/HttpClientService.java
----------------------------------------------------------------------
diff --git a/jstorm-utility/topology-monitor/src/main/java/com/dianping/cosmos/monitor/HttpClientService.java b/jstorm-utility/topology-monitor/src/main/java/com/dianping/cosmos/monitor/HttpClientService.java
deleted file mode 100755
index 843e5da..0000000
--- a/jstorm-utility/topology-monitor/src/main/java/com/dianping/cosmos/monitor/HttpClientService.java
+++ /dev/null
@@ -1,120 +0,0 @@
-package com.dianping.cosmos.monitor;
-
-import java.io.IOException;
-import java.net.URI;
-import java.net.URL;
-import java.util.List;
-
-import org.apache.http.HttpEntity;
-import org.apache.http.HttpResponse;
-import org.apache.http.NameValuePair;
-import org.apache.http.client.HttpClient;
-import org.apache.http.client.entity.UrlEncodedFormEntity;
-import org.apache.http.client.methods.HttpGet;
-import org.apache.http.client.methods.HttpPost;
-import org.apache.http.client.methods.HttpUriRequest;
-import org.apache.http.impl.client.DefaultHttpClient;
-import org.apache.http.params.CoreConnectionPNames;
-import org.apache.http.util.EntityUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-
-/**
- * 向cat写入metric相关信息
- * @author xinchun.wang
- *
- */
-public class HttpClientService {
-	
-	private static final Logger LOGGER = LoggerFactory.getLogger(HttpClientService.class);
-	
-//	private static JSONUtil jsonUtil = JSONUtil.getInstance();
-	
-	protected String excuteGet(String url, boolean useURI) throws Exception {
-		HttpClient httpClient = getHttpClient();
-		HttpUriRequest request = getGetRequest(url, useURI);
-		
-		HttpResponse httpResponse = httpClient.execute(request);
-
-		String response = parseResponse(url, httpResponse);
-		return response;
-	}
-	
-	protected String excutePost(String url, List<NameValuePair> nvps) throws Exception {
-		HttpClient httpClient = getHttpClient();
-		HttpPost httpPost = new HttpPost(url);
-		httpPost.setEntity(new UrlEncodedFormEntity(nvps));
-		HttpResponse httpResponse = httpClient.execute(httpPost);
-		String response = parseResponse(url, httpResponse);
-		return response;
-	}
-
-	private String parseResponse(String url, HttpResponse httpResponse)
-			throws Exception, IOException {
-		int status = httpResponse.getStatusLine().getStatusCode();
-		if(status != 200){
-			String errorMsg = "Error occurs in calling acl service: " + url + ", with status:" + status;
-			throw new Exception(errorMsg);
-		}
-		HttpEntity entry = httpResponse.getEntity();
-		String response = EntityUtils.toString(entry, "UTF-8");
-		return response;
-	}
-
-	private HttpClient getHttpClient() {
-		HttpClient httpClient = new DefaultHttpClient();
-		httpClient.getParams().setParameter(CoreConnectionPNames.CONNECTION_TIMEOUT, 5000);
-		httpClient.getParams().setParameter(CoreConnectionPNames.SO_TIMEOUT, 5000);
-		return httpClient;
-	}
-
-	private HttpUriRequest getGetRequest(String url, boolean useURI) throws Exception {
-		HttpUriRequest request;
-		if(useURI){
-			URL requestURL = new URL(url);
-			URI uri = new URI(
-				requestURL.getProtocol(),
-				null,
-				requestURL.getHost(), 
-				requestURL.getPort(),
-				requestURL.getPath(), 
-				requestURL.getQuery(), 
-				null);
-			request = new HttpGet(uri);
-		}
-		else{
-				request = new HttpGet(url);
-		}
-		return request;
-	}
-	
-//	protected boolean parseResultMap(String response, String url) throws Exception{
-//		Map<?, ?> result = jsonUtil.formatJSON2Map(response);
-//		if(result == null){
-//			return false;
-//		}
-//		String code = (String)result.get("statusCode");
-//		if("-1".equals(code)){
-//            throw new Exception(String.valueOf(result.get("errorMsg")));
-//		}
-//		return true;
-//	}
-	
-
-	public void get(String url) throws Exception{
-		String response = excuteGet(url, false);
-		if(response == null){
-			LOGGER.error("call uri error, response is null, uri = " + url);
-		}
-		//parseResultMap(response, url);
-	}
-	
-	public void getByURI(String url) throws Exception{
-		String response = excuteGet(url, true);
-		if(response == null){
-			LOGGER.error("call uri error, response is null, uri = " + url);
-		}
-        //parseResultMap(response, url);
-	}
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e8f64d5e/jstorm-utility/topology-monitor/src/main/java/com/dianping/cosmos/monitor/SpoutCounter.java
----------------------------------------------------------------------
diff --git a/jstorm-utility/topology-monitor/src/main/java/com/dianping/cosmos/monitor/SpoutCounter.java b/jstorm-utility/topology-monitor/src/main/java/com/dianping/cosmos/monitor/SpoutCounter.java
deleted file mode 100755
index 4e06df3..0000000
--- a/jstorm-utility/topology-monitor/src/main/java/com/dianping/cosmos/monitor/SpoutCounter.java
+++ /dev/null
@@ -1,24 +0,0 @@
-package com.dianping.cosmos.monitor;
-
-import java.util.concurrent.atomic.AtomicLong;
-
-public class SpoutCounter {
-    private AtomicLong repeatCounter = new AtomicLong(0l);
-    private AtomicLong tupleCounter = new AtomicLong(0l);
-    
-    public void incrRepeatCounter(){
-        repeatCounter.incrementAndGet();
-    }
-    
-    public long getRepeatCounter(){
-        return repeatCounter.get();
-    }
-    
-    public void incrTupleCounter(long increment){
-        tupleCounter.addAndGet(increment);
-    }
-    
-    public long getTupleCounter(){
-        return tupleCounter.get();
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e8f64d5e/jstorm-utility/topology-monitor/src/main/java/com/dianping/cosmos/monitor/TopologyMonitor.java
----------------------------------------------------------------------
diff --git a/jstorm-utility/topology-monitor/src/main/java/com/dianping/cosmos/monitor/TopologyMonitor.java b/jstorm-utility/topology-monitor/src/main/java/com/dianping/cosmos/monitor/TopologyMonitor.java
deleted file mode 100755
index 88787ca..0000000
--- a/jstorm-utility/topology-monitor/src/main/java/com/dianping/cosmos/monitor/TopologyMonitor.java
+++ /dev/null
@@ -1,90 +0,0 @@
-package com.dianping.cosmos.monitor;
-
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-
-import org.apache.commons.lang.StringUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import backtype.storm.metric.api.IMetricsConsumer.DataPoint;
-import backtype.storm.metric.api.IMetricsConsumer.TaskInfo;
-
-public class TopologyMonitor {
-    private static final Logger LOGGER = LoggerFactory.getLogger(TopologyMonitor.class);
-
-    private static Map<Integer, SpoutCounter> spoutCounterMap = new ConcurrentHashMap<Integer, SpoutCounter>();
-
-    public void monitorStatus(String stormId, TaskInfo taskInfo, DataPoint p) {
-        SpoutCounter counter = spoutCounterMap.get(taskInfo.srcTaskId);
-        if(counter == null){
-            counter = new SpoutCounter();
-            spoutCounterMap.put(taskInfo.srcTaskId, counter);
-        }
-        counter.incrRepeatCounter();
-        String value = String.valueOf(p.value);
-        long increment = Long.parseLong(value);
-        counter.incrTupleCounter(increment);
-        //连续1分钟
-        if(counter.getRepeatCounter() >= 12){
-            //数据量少于某个记录
-            LOGGER.info("last minute tuple = " + counter.getTupleCounter());
-             if(counter.getTupleCounter() <= 10000){
-                LOGGER.error("spout has problem, restar topology....");
-                //restartTopology(stormId);
-            }
-            spoutCounterMap.clear();
-        }
-    }
-    
-    /**
-     * stromId: MobileUV_7-212-1409657868
-     * @param stormId
-     */
-    public void restartTopology(String stormId){
-        String currentTopology = StringUtils.substringBefore(stormId, "-");
-        String topologyPrefix = StringUtils.substringBefore(currentTopology, "_");
-        String topologyIndex =  StringUtils.substringAfter(currentTopology, "_");
-        int newIndex = Integer.parseInt(topologyIndex) + 1;
-        String newTopologyName = topologyPrefix + "_" + newIndex;
-        LOGGER.info("new topology name = " + newTopologyName);
-        execStartCommand(newTopologyName);
-        LOGGER.info("execStartCommand finish ..");
-        execShutdownCommand(currentTopology);
-        LOGGER.info("execShutdownCommand finish ..");
-    }
-    
-    public void execStartCommand(String topologyName){
-        Process process;
-        try {
-            process = Runtime.getRuntime().exec(new String[]{
-                    "/usr/local/storm/bin/storm",  
-                    "jar", 
-                    "/home/hadoop/topology/meteor-traffic-0.0.1.jar", 
-                    "com.dianping.data.warehouse.traffic.mobile.MobileUVTopology",  
-                    topologyName});
-                process.waitFor();
-        } catch (Exception e) {
-            LOGGER.error("", e);
-        }  
-    }
-    
-    public void execShutdownCommand(String topologyName){
-        Process process;
-        try {
-            process = Runtime.getRuntime().exec(new String[]{
-                    "/usr/local/storm/bin/storm",  
-                    "kill", 
-                    topologyName,
-                    "10"});
-                process.waitFor();
-        } catch (Exception e) {
-            LOGGER.error("", e);
-        }  
-    }
-    
-    public static void main(String[] args){
-        TopologyMonitor monitor = new TopologyMonitor();
-        monitor.restartTopology("MobileUV_7-212-1409657868");
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e8f64d5e/jstorm-utility/topology-monitor/src/main/java/com/dianping/cosmos/monitor/topology/ClusterInfoBolt.java
----------------------------------------------------------------------
diff --git a/jstorm-utility/topology-monitor/src/main/java/com/dianping/cosmos/monitor/topology/ClusterInfoBolt.java b/jstorm-utility/topology-monitor/src/main/java/com/dianping/cosmos/monitor/topology/ClusterInfoBolt.java
deleted file mode 100755
index 2dd99bd..0000000
--- a/jstorm-utility/topology-monitor/src/main/java/com/dianping/cosmos/monitor/topology/ClusterInfoBolt.java
+++ /dev/null
@@ -1,170 +0,0 @@
-package com.dianping.cosmos.monitor.topology;
-
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.thrift7.TException;
-import org.apache.thrift7.transport.TTransportException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import backtype.storm.Config;
-import backtype.storm.generated.ClusterSummary;
-import backtype.storm.generated.ExecutorSummary;
-import backtype.storm.generated.Nimbus.Client;
-import backtype.storm.generated.NotAliveException;
-import backtype.storm.generated.SupervisorSummary;
-import backtype.storm.generated.TopologyInfo;
-import backtype.storm.generated.TopologySummary;
-import backtype.storm.task.OutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.topology.base.BaseRichBolt;
-import backtype.storm.tuple.Fields;
-import backtype.storm.tuple.Tuple;
-import backtype.storm.tuple.Values;
-import backtype.storm.utils.NimbusClient;
-import backtype.storm.utils.Utils;
-
-import com.dianping.cosmos.monitor.HttpCatClient;
-import com.dianping.cosmos.util.Constants;
-import com.dianping.cosmos.util.TupleHelpers;
-
-@SuppressWarnings({ "rawtypes", "unchecked"})
-public class ClusterInfoBolt  extends BaseRichBolt{
-    private static final Logger LOGGER = LoggerFactory.getLogger(ClusterInfoBolt.class);
-
-    private static final long serialVersionUID = 1L;
-    private transient Client client;
-    private transient NimbusClient nimbusClient;
-    private OutputCollector collector;
-    private Map configMap = null;
-   
-    @Override
-    public void prepare(Map map, TopologyContext topologycontext,
-            OutputCollector outputcollector) {
-        this.collector = outputcollector;
-        this.configMap = map;
-        initClient(configMap);
-    }
-    private void initClient(Map map) {
-        nimbusClient = NimbusClient.getConfiguredClient(map);
-        client = nimbusClient.getClient();
-    }
-    @Override
-    public void execute(Tuple tuple) {
-        if (TupleHelpers.isTickTuple(tuple)) {
-            if(nimbusClient == null){
-                initClient(configMap);
-            }
-            getClusterInfo(client);
-            collector.emit(new Values(tuple));
-        }        
-    }
-    
-    private void getClusterInfo(Client client) {
-        try {
-            ClusterSummary clusterSummary = client.getClusterInfo();
-            List<SupervisorSummary> supervisorSummaryList = clusterSummary.get_supervisors();
-            int totalWorkers = 0;
-            int usedWorkers = 0;
-            for(SupervisorSummary summary : supervisorSummaryList){
-                totalWorkers += summary.get_num_workers() ;
-                usedWorkers += summary.get_num_used_workers();
-            }
-            int freeWorkers = totalWorkers - usedWorkers;
-            LOGGER.info("cluster totalWorkers = " + totalWorkers 
-                    + ", usedWorkers = " + usedWorkers 
-                    + ", freeWorkers  = " +  freeWorkers);
-            
-            HttpCatClient.sendMetric("ClusterMonitor", "freeSlots", "avg", String.valueOf(freeWorkers));
-            HttpCatClient.sendMetric("ClusterMonitor", "totalSlots", "avg", String.valueOf(totalWorkers));
-            
-            List<TopologySummary> topologySummaryList = clusterSummary.get_topologies();
-            long clusterTPS = 0l;
-            for(TopologySummary topology : topologySummaryList){
-                long topologyTPS = getTopologyTPS(topology, client);
-                clusterTPS += topologyTPS;
-                if(topology.get_name().startsWith("ClusterMonitor")){
-                    continue;
-                }
-                HttpCatClient.sendMetric(topology.get_name(), topology.get_name() + "-TPS", "avg", String.valueOf(topologyTPS));
-            }
-            HttpCatClient.sendMetric("ClusterMonitor", "ClusterEmitTPS", "avg", String.valueOf(clusterTPS));
-            
-        } catch (TException e) {
-            initClient(configMap);
-            LOGGER.error("get client info error.", e);
-        }
-        catch(NotAliveException nae){
-            LOGGER.warn("topology is dead.", nae);
-        }
-    }
-    
-    protected long getTopologyTPS(TopologySummary topology, Client client) throws NotAliveException, TException{
-        long topologyTps = 0l;
-        String topologyId = topology.get_id();
-        if(topologyId.startsWith("ClusterMonitor")){
-            return topologyTps;
-        }
-        TopologyInfo topologyInfo = client.getTopologyInfo(topologyId);
-        if(topologyInfo == null){
-            return topologyTps;
-        }
-        List<ExecutorSummary> executorSummaryList = topologyInfo.get_executors();
-        for(ExecutorSummary executor : executorSummaryList){
-            topologyTps += getComponentTPS(executor);
-        }
-        LOGGER.info("topology = " + topology.get_name() + ", tps = " + topologyTps);
-        return topologyTps;
-    }
-    
-    private long getComponentTPS(ExecutorSummary executor) {
-        long componentTps = 0l;
-        if(executor == null){
-            return componentTps;
-        }
-        String componentId = executor.get_component_id();
-        
-        if(Utils.isSystemId(componentId)){
-            return componentTps;
-        }
-        if(executor.get_stats() == null){
-            return componentTps;
-        }
-
-        Map<String, Map<String, Long>> emittedMap = executor.get_stats().get_emitted();
-        Map<String, Long> minutesEmitted = emittedMap.get("600");
-        if(minutesEmitted == null){
-            return componentTps;
-        }
-        for(Map.Entry<String, Long> emittedEntry : minutesEmitted.entrySet()){
-            if(Utils.isSystemId(emittedEntry.getKey())){
-                continue;
-            }
-            if(executor.get_uptime_secs() >= 600){
-                componentTps += emittedEntry.getValue() / 600;
-            }
-            if(executor.get_uptime_secs() >= 10 && executor.get_uptime_secs() < 600){
-                componentTps += emittedEntry.getValue() / executor.get_uptime_secs();
-            }   
-        }
-        LOGGER.debug("component = " + componentId + ", tps = " + componentTps);
-        return componentTps;
-    }
-
-
-    @Override
-    public void declareOutputFields(OutputFieldsDeclarer outputfieldsdeclarer) {        
-        outputfieldsdeclarer.declare(new Fields("monitor"));
-    }
-    
-    @Override
-    public Map getComponentConfiguration(){
-         Map<String, Object> conf = new HashMap<String, Object>();
-         conf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, Constants.TPS_COUNTER_FREQUENCY_IN_SECONDS);
-         return conf;
-    }
-        
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e8f64d5e/jstorm-utility/topology-monitor/src/main/java/com/dianping/cosmos/monitor/topology/ClusterInfoTopology.java
----------------------------------------------------------------------
diff --git a/jstorm-utility/topology-monitor/src/main/java/com/dianping/cosmos/monitor/topology/ClusterInfoTopology.java b/jstorm-utility/topology-monitor/src/main/java/com/dianping/cosmos/monitor/topology/ClusterInfoTopology.java
deleted file mode 100755
index 2c9e9c5..0000000
--- a/jstorm-utility/topology-monitor/src/main/java/com/dianping/cosmos/monitor/topology/ClusterInfoTopology.java
+++ /dev/null
@@ -1,18 +0,0 @@
-package com.dianping.cosmos.monitor.topology;
-
-import backtype.storm.Config;
-import backtype.storm.StormSubmitter;
-import backtype.storm.topology.TopologyBuilder;
-
-public class ClusterInfoTopology {
-    public static void main(String[] args) throws Exception {
-        TopologyBuilder builder = new TopologyBuilder();
-        
-        builder.setBolt("ClusterInfo", new ClusterInfoBolt(), 1);
-        Config conf = new Config();
-        conf.setNumWorkers(1);
-        
-        StormSubmitter.submitTopology("ClusterMonitor", conf, builder.createTopology());
-
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e8f64d5e/jstorm-utility/topology-monitor/src/main/java/com/dianping/cosmos/util/CatClient.java
----------------------------------------------------------------------
diff --git a/jstorm-utility/topology-monitor/src/main/java/com/dianping/cosmos/util/CatClient.java b/jstorm-utility/topology-monitor/src/main/java/com/dianping/cosmos/util/CatClient.java
deleted file mode 100755
index 65f4a04..0000000
--- a/jstorm-utility/topology-monitor/src/main/java/com/dianping/cosmos/util/CatClient.java
+++ /dev/null
@@ -1,19 +0,0 @@
-package com.dianping.cosmos.util;
-import com.dianping.cat.Cat;
-
-public class CatClient {
-    
-    private CatClient(){
-    }
-    
-    private static Cat CAT = Cat.getInstance();
-    
-    static{
-        Cat.initialize("cat02.nh","cat03.nh","cat04.nh","cat05.nh");
-    }
-    
-    public static Cat getInstance(){
-        return CAT;
-    }
-   
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e8f64d5e/jstorm-utility/topology-monitor/src/main/java/com/dianping/cosmos/util/CatMetricUtil.java
----------------------------------------------------------------------
diff --git a/jstorm-utility/topology-monitor/src/main/java/com/dianping/cosmos/util/CatMetricUtil.java b/jstorm-utility/topology-monitor/src/main/java/com/dianping/cosmos/util/CatMetricUtil.java
deleted file mode 100755
index 0c4b183..0000000
--- a/jstorm-utility/topology-monitor/src/main/java/com/dianping/cosmos/util/CatMetricUtil.java
+++ /dev/null
@@ -1,45 +0,0 @@
-package com.dianping.cosmos.util;
-
-import org.apache.commons.lang.StringUtils;
-
-public class CatMetricUtil {
-    private static final String CAT_METRIC_NAME_PREFIX = "Cat#"; 
-    
-    /**
-     * 返回BlackHoleSout的metric名称
-     * @param topic
-     * @param group
-     * @return
-     */
-    public static String getSpoutMetricName(String topic, String group){
-        return CAT_METRIC_NAME_PREFIX.concat(topic).concat("[").concat(group).concat("]");
-    }
-    
-    /**
-     * 判断是否cat的metirc
-     * @param dataPointName
-     * @return
-     */
-    public static boolean isCatMetric(String dataPointName){
-        if(StringUtils.isBlank(dataPointName)){
-            return false;
-        }
-        return StringUtils.startsWith(dataPointName, CAT_METRIC_NAME_PREFIX);
-    }
-    
-    
-    
-    /**
-     * 根据metric的名字,返回写入cat上的key
-     * @param spoutMetricName
-     * @return
-     */
-    public static String getCatMetricKey(String spoutMetricName){
-        if(StringUtils.isBlank(spoutMetricName) 
-                || !StringUtils.startsWith(spoutMetricName, CAT_METRIC_NAME_PREFIX)){
-            return "default";
-        }
-        return StringUtils.substringAfter(spoutMetricName, CAT_METRIC_NAME_PREFIX);
-        
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e8f64d5e/jstorm-utility/topology-monitor/src/main/java/com/dianping/cosmos/util/Constants.java
----------------------------------------------------------------------
diff --git a/jstorm-utility/topology-monitor/src/main/java/com/dianping/cosmos/util/Constants.java b/jstorm-utility/topology-monitor/src/main/java/com/dianping/cosmos/util/Constants.java
deleted file mode 100755
index c5edf1a..0000000
--- a/jstorm-utility/topology-monitor/src/main/java/com/dianping/cosmos/util/Constants.java
+++ /dev/null
@@ -1,9 +0,0 @@
-package com.dianping.cosmos.util;
-
-public class Constants {
-
-	public static final int EMIT_FREQUENCY_IN_SECONDS = 5;
-	
-	public static final int TPS_COUNTER_FREQUENCY_IN_SECONDS = 30;
-
-}