You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by pt...@apache.org on 2015/11/05 21:41:27 UTC

[48/60] [abbrv] [partial] storm git commit: Release 2.0.4-SNAPSHOT

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/history_cn.md
----------------------------------------------------------------------
diff --git a/history_cn.md b/history_cn.md
index 1b90dd1..e57e9bb 100644
--- a/history_cn.md
+++ b/history_cn.md
@@ -1,5 +1,82 @@
 [JStorm English introduction](http://42.121.19.155/jstorm/JStorm-introduce-en.pptx)
 [JStorm Chinese introduction](http://42.121.19.155/jstorm/JStorm-introduce.pptx)
+#Release 2.0.4-SNAPSHOT
+## New features
+1.完全重构采样系统, 使用全新的Rollingwindow和Metric计算方式,尤其是netty采样数据,另外metric 发送和接收将不通过zk
+2.完全重构web-ui
+3.引入rocketdb,增加nimbus cache layer
+4.梳理所有的zk节点和zk操作, 去掉无用的zk 操作
+5.梳理所有的thrift 数据结构和函数, 去掉无用的rpc函数
+6.将jstorm-client/jstorm-client-extension/jstorm-core整合为jstorm-core
+7.同步依赖和storm一样
+8.同步apache-storm-0.10.0-beta1 java 代码
+9.切换日志系统到logback
+10.升级thrift 到apache thrift 0.9.2
+11. 针对超大型任务600个worker/2000个task以上任务进行优化
+12. 要求 jdk7 or higher
+
+#Release 0.9.7.1
+## New features
+1. 增加Tuple自动batch的支持,以提高TPS以及降低消息处理延迟(task.batch.tuple=true,task.msg.batch.size=4)
+2. localFirst在本地节点处理能力跟不上时,自动对外部节点进行扩容
+3. 任务运行时,支持对任务配置的动态更新
+4. 支持任务对task心跳和task cleanup超时时间的自定义设置
+5. 增加disruptor queue对非阻塞模式TimeoutBlockingWaitStrategy的支持
+6. 增加Netty层消息发送超时时间设置的支持,以及Netty Client配置的优化
+7. 更新Tuple消息处理架构。去除不必要的总接收和总发送队列,减少消息流动环节,提高性能以及降低jstorm自身的cpu消耗。
+8. 增加客户端"--include-jars", 提交任务时,可以依赖额外的jar
+9. 启动nimbus/supervisor时, 如果取得的是127.0.0.0地址时, 拒绝启动
+10. 增加自定义样例
+11. 合并supervisor 的zk同步线程syncSupervisor和worker同步线程syncProcess
+## 配置变更
+1. 默认超时心跳时间设置为4分钟
+2. 修改netty 线程池clientScheduleService大小为5
+## Bug fix
+1. 优化gc参数,4g以下内存的worker默认4个gc线程,4g以上内存, 按内存大小/1g * 1.5原则设置gc线程数量
+2. Fix在bolt处理速度慢时,可能出现的task心跳更新不及时的bug
+3. Fix在一些情况下,netty连接重连时的异常等待bug
+4. 提交任务时, 避免重复创建thrift client
+5. Fix 启动worker失败时,重复下载binary问题
+##运维和脚本
+1. 优化cleandisk.sh脚本, 防止把当前目录删除和/tmp/hsperfdata_admin/
+2. 增加example下脚本执行权限
+3. 添加参数supervisor.host.start: true/false,可以通过脚本start.sh批量控制启动supervisor或不启动supervisor,默认是启动supervisor
+
+#Release 0.9.7
+## New features
+1. 实现topology任务并发动态调整的功能。在任务不下线的情况下,可以动态的对worker,spout, bolt或者ack进行扩容或缩容。rebalance命令被扩展用于支持动态扩容/缩容功能。
+2. 当打开资源隔离时,增加worker对cpu核使用上限的控制
+3. 调整task心跳更新机制。保证能正确反映spout/bolt exectue主线程的状态。
+4. 对worker和task的日志,增加jstorm信息前缀(clusterName, topologyName, ip:port, componentName, taskId, taskIndex)的支持
+5. 对topology任务调度时,增加对supervisor心跳状态的检查,不往无响应的supervisor调度任务
+6. 增加metric查询API,如: task的队列负载情况,worker的cpu,memory使用情况
+7. 增加supervisor上对任务jar包下载的重试,让worker不会因为jar在下载过程中的损坏,而启动失败
+8. 增加ZK Cache功能, 加快zk 读取速度, 并对部分节点采取直读方式
+9. 增加thrift getVersion api, 当客户端和服务器端版本不一致是,报warning
+10. 增加supervisor 心跳检查, 会拒绝分配任务到supervisor心跳超时的supervisor
+11. 更新发送到Alimonitor的user defined metrics 数据结构
+12. 增加客户端exclude-jar 功能, 当客户端提交任务时,可以通过exclude-jar和classloader来解决jar冲突问题。
+## 配置变更
+1. 修改supervisor到nimbus的心跳 超时时间到180秒
+2. 为避免内存outofmemory, 设置storm.messaging.netty.max.pending默认值为4
+3. 设置Nimbus 内存至4G
+4. 调大队列大小 task 队列大小为1024, 总发送队列和总接收队列为2048
+## Bug fix
+1. 短时间能多次restart worker配置多的任务时,由于Nimbus thrift thread的OOM导致,Supervisor可能出现假死的情况
+2. 同时提交任务,后续的任务可能会失败
+3. tickTuple不需要ack,更正对于tickTuple不正确的failed消息统计
+4. 解决use.old.assignment=true时,默认调度可能出现错误 
+5. 解决删除topology zk 清理不干净问题
+6. 解决当做任务分配时, restart topology失败问题
+7. 解决同时提交多个topology 竞争问题
+8. 解决NPE 当注册metrics 
+9. 解决 zkTool 读取 monitor的 znode 失败问题
+10.解决 本地模式和打开classloader模式下, 出现异常问题
+11.解决使用自定义日志logback时, 本地模式下,打印双份日志问题
+## 运维& 脚本
+1. Add rpm build spec
+2. Add deploy files of jstorm for rpm package building
+3. cronjob改成每小时运行一次, 并且coredump 改成保留1个小时
 
 #Release 0.9.6.3
 ## New features

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client-extension/pom.xml
----------------------------------------------------------------------
diff --git a/jstorm-client-extension/pom.xml b/jstorm-client-extension/pom.xml
deleted file mode 100644
index 40650cd..0000000
--- a/jstorm-client-extension/pom.xml
+++ /dev/null
@@ -1,85 +0,0 @@
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
-	
-	
-	<parent>
-		<groupId>com.alibaba.jstorm</groupId>
-		<artifactId>jstorm-all</artifactId>
-		<version>0.9.6.3</version>
-		<relativePath>..</relativePath>
-	</parent>
- 	<!-- <parent>
- 		<groupId>com.taobao</groupId> 
- 		<artifactId>parent</artifactId> 
- 		<version>1.0.2</version> 
- 	</parent>  -->
-	<modelVersion>4.0.0</modelVersion>
-	<groupId>com.alibaba.jstorm</groupId>
-	<artifactId>jstorm-client-extension</artifactId>
-	<version>0.9.6.3</version>
-	<packaging>jar</packaging>
-	<name>${project.artifactId}-${project.version}</name>
-
-	<build>
-		<plugins>
-			<plugin>
-				<artifactId>maven-compiler-plugin</artifactId>
-				<version>2.3.2</version>
-				<configuration>
-					<source>1.6</source>
-					<target>1.6</target>
-				</configuration>
-			</plugin>
-			<plugin>
-				<groupId>org.apache.maven.plugins</groupId>
-				<artifactId>maven-source-plugin</artifactId>
-				<executions>
-					<execution>
-						<id>attach-sources</id>
-						<goals>
-							<goal>jar</goal>
-						</goals>
-					</execution>
-				</executions>
-			</plugin>
-		</plugins>
-	</build>
-	<properties>
-		<powermock.version>1.4.11</powermock.version>
-		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
-	</properties>
-	<dependencies>
-		<dependency>
-			<groupId>com.alibaba.jstorm</groupId>
-			<artifactId>jstorm-client</artifactId>
-			<version>${project.version}</version>
-			<scope>provided</scope>
-		</dependency>
-		<dependency>
-			<groupId>com.alibaba</groupId>
-			<artifactId>fastjson</artifactId>
-			<version>1.1.41</version>
-		</dependency>
-		<dependency>
-		  <groupId>com.sun.net.httpserver</groupId>
-		  <artifactId>http</artifactId>
-		  <version>20070405</version>
-		</dependency>
-		<dependency>
-			<groupId>org.powermock</groupId>
-			<artifactId>powermock-module-junit4</artifactId>
-			<version>${powermock.version}</version>
-			<scope>test</scope>
-		</dependency>	
-		<dependency>
-			<groupId>com.codahale.metrics</groupId>
-			<artifactId>metrics-core</artifactId>
-			<version>3.0.1</version>
-		</dependency>		
-		<dependency>
-			<groupId>com.codahale.metrics</groupId>
-			<artifactId>metrics-jvm</artifactId>
-			<version>3.0.1</version>
-		</dependency>
-	</dependencies>
-</project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client-extension/src/main/java/com/alibaba/jstorm/batch/BatchId.java
----------------------------------------------------------------------
diff --git a/jstorm-client-extension/src/main/java/com/alibaba/jstorm/batch/BatchId.java b/jstorm-client-extension/src/main/java/com/alibaba/jstorm/batch/BatchId.java
deleted file mode 100644
index e147db0..0000000
--- a/jstorm-client-extension/src/main/java/com/alibaba/jstorm/batch/BatchId.java
+++ /dev/null
@@ -1,66 +0,0 @@
-package com.alibaba.jstorm.batch;
-
-import java.io.Serializable;
-import java.util.concurrent.atomic.AtomicLong;
-
-import org.apache.commons.lang.builder.ToStringBuilder;
-import org.apache.commons.lang.builder.ToStringStyle;
-
-public class BatchId implements Serializable {
-	private static final long serialVersionUID = 5720810158625748049L;
-	protected final long id;
-
-	protected BatchId(long id) {
-		this.id = id;
-	}
-
-	public long getId() {
-		return id;
-	}
-
-	@Override
-	public int hashCode() {
-		final int prime = 31;
-		int result = 1;
-		result = prime * result + (int) (id ^ (id >>> 32));
-		return result;
-	}
-
-	@Override
-	public boolean equals(Object obj) {
-		if (this == obj)
-			return true;
-		if (obj == null)
-			return false;
-		if (getClass() != obj.getClass())
-			return false;
-		BatchId other = (BatchId) obj;
-		if (id != other.id)
-			return false;
-		return true;
-	}
-
-	@Override
-	public String toString() {
-		return ToStringBuilder.reflectionToString(this,
-				ToStringStyle.SHORT_PREFIX_STYLE);
-	}
-
-	private static AtomicLong staticId = new AtomicLong(0);
-	
-	public static void updateId(long id) {
-		staticId.set(id);
-	}
-
-	public static BatchId mkInstance() {
-		long id = staticId.incrementAndGet();
-
-		return new BatchId(id);
-	}
-	
-	public static BatchId incBatchId(BatchId old) {
-		long other = old.getId();
-		return new BatchId(other + 1);
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client-extension/src/main/java/com/alibaba/jstorm/batch/BatchTopologyBuilder.java
----------------------------------------------------------------------
diff --git a/jstorm-client-extension/src/main/java/com/alibaba/jstorm/batch/BatchTopologyBuilder.java b/jstorm-client-extension/src/main/java/com/alibaba/jstorm/batch/BatchTopologyBuilder.java
deleted file mode 100644
index dc599f1..0000000
--- a/jstorm-client-extension/src/main/java/com/alibaba/jstorm/batch/BatchTopologyBuilder.java
+++ /dev/null
@@ -1,69 +0,0 @@
-package com.alibaba.jstorm.batch;
-
-import org.apache.log4j.Logger;
-
-import backtype.storm.topology.BoltDeclarer;
-import backtype.storm.topology.IBasicBolt;
-import backtype.storm.topology.SpoutDeclarer;
-import backtype.storm.topology.TopologyBuilder;
-
-import com.alibaba.jstorm.batch.impl.BatchSpoutTrigger;
-import com.alibaba.jstorm.batch.impl.CoordinatedBolt;
-import com.alibaba.jstorm.batch.util.BatchDef;
-
-public class BatchTopologyBuilder {
-	private static final Logger LOG = Logger
-			.getLogger(BatchTopologyBuilder.class);
-
-	private TopologyBuilder topologyBuilder;
-
-	private SpoutDeclarer spoutDeclarer;
-
-	public BatchTopologyBuilder(String topologyName) {
-		topologyBuilder = new TopologyBuilder();
-
-		spoutDeclarer = topologyBuilder.setSpout(BatchDef.SPOUT_TRIGGER,
-				new BatchSpoutTrigger(), 1);
-	}
-
-	public BoltDeclarer setSpout(String id, IBatchSpout spout, int paralel) {
-
-		BoltDeclarer boltDeclarer = this
-				.setBolt(id, (IBatchSpout) spout, paralel);
-		boltDeclarer.allGrouping(BatchDef.SPOUT_TRIGGER,
-				BatchDef.COMPUTING_STREAM_ID);
-
-		return boltDeclarer;
-	}
-
-	public BoltDeclarer setBolt(String id, IBasicBolt bolt, int paralel) {
-		CoordinatedBolt coordinatedBolt = new CoordinatedBolt(bolt);
-
-		BoltDeclarer boltDeclarer = topologyBuilder.setBolt(id,
-				coordinatedBolt, paralel);
-
-		if (bolt instanceof IPrepareCommit) {
-			boltDeclarer.allGrouping(BatchDef.SPOUT_TRIGGER,
-					BatchDef.PREPARE_STREAM_ID);
-		}
-
-		if (bolt instanceof ICommitter) {
-			boltDeclarer.allGrouping(BatchDef.SPOUT_TRIGGER,
-					BatchDef.COMMIT_STREAM_ID);
-			boltDeclarer.allGrouping(BatchDef.SPOUT_TRIGGER,
-					BatchDef.REVERT_STREAM_ID);
-		}
-
-		if (bolt instanceof IPostCommit) {
-			boltDeclarer.allGrouping(BatchDef.SPOUT_TRIGGER,
-					BatchDef.POST_STREAM_ID);
-		}
-
-		return boltDeclarer;
-	}
-
-	public TopologyBuilder getTopologyBuilder() {
-		return topologyBuilder;
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client-extension/src/main/java/com/alibaba/jstorm/batch/IBatchSpout.java
----------------------------------------------------------------------
diff --git a/jstorm-client-extension/src/main/java/com/alibaba/jstorm/batch/IBatchSpout.java b/jstorm-client-extension/src/main/java/com/alibaba/jstorm/batch/IBatchSpout.java
deleted file mode 100644
index d3d1178..0000000
--- a/jstorm-client-extension/src/main/java/com/alibaba/jstorm/batch/IBatchSpout.java
+++ /dev/null
@@ -1,37 +0,0 @@
-package com.alibaba.jstorm.batch;
-
-import java.io.Serializable;
-
-import backtype.storm.topology.IBasicBolt;
-
-public interface IBatchSpout extends IBasicBolt, ICommitter, Serializable {
-
-	/**
-	 * input's filed 0 is BatchId
-	 * 
-	 * execute only receive trigger message
-	 * 
-	 * do emitBatch operation in execute whose streamID is "batch/compute-stream"
-	 */
-	//void execute(Tuple input, IBasicOutputCollector collector);
-    /**
-	 * begin to ack batchId's data
-	 * 
-	 * return value will be stored in ZK, so sometimes don't need special action
-	 * 
-	 * @param id
-	 */
-	//byte[] commit(BatchId id) throws FailedException;
-	
-	/**
-	 * begin to revert batchId's data
-	 * 
-	 * If current task fails to commit batchId, it won't call revert(batchId)
-	 * If current task fails to revert batchId, JStorm won't call revert again.  
-	 * 
-	 * if not transaction, it can don't care revert
-	 * 
-	 * @param id
-	 */
-	//void revert(BatchId id, byte[] commitResult);
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client-extension/src/main/java/com/alibaba/jstorm/batch/ICommitter.java
----------------------------------------------------------------------
diff --git a/jstorm-client-extension/src/main/java/com/alibaba/jstorm/batch/ICommitter.java b/jstorm-client-extension/src/main/java/com/alibaba/jstorm/batch/ICommitter.java
deleted file mode 100644
index 9492398..0000000
--- a/jstorm-client-extension/src/main/java/com/alibaba/jstorm/batch/ICommitter.java
+++ /dev/null
@@ -1,36 +0,0 @@
-package com.alibaba.jstorm.batch;
-
-import java.io.Serializable;
-
-import backtype.storm.topology.FailedException;
-
-/**
- * The less committer, the state is more stable.
- * Don't need to do 
- * 
- * @author zhongyan.feng
- * @version
- */
-public interface ICommitter extends Serializable{
-	/**
-	 * begin to commit batchId's data, then return the commit result
-	 * The commitResult will store into outside storage
-	 * 
-	 * if failed to commit, please throw FailedException
-	 * 
-	 * 
-	 * 
-	 * @param id
-	 */
-	byte[] commit(BatchId id) throws FailedException;
-	
-	/**
-	 * begin to revert batchId's data
-	 * 
-	 * If current task fails to commit batchId, it won't call revert(batchId)
-	 * If current task fails to revert batchId, JStorm won't call revert again.  
-	 * 
-	 * @param id
-	 */
-	void revert(BatchId id, byte[] commitResult);
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client-extension/src/main/java/com/alibaba/jstorm/batch/IPostCommit.java
----------------------------------------------------------------------
diff --git a/jstorm-client-extension/src/main/java/com/alibaba/jstorm/batch/IPostCommit.java b/jstorm-client-extension/src/main/java/com/alibaba/jstorm/batch/IPostCommit.java
deleted file mode 100644
index b408a3f..0000000
--- a/jstorm-client-extension/src/main/java/com/alibaba/jstorm/batch/IPostCommit.java
+++ /dev/null
@@ -1,14 +0,0 @@
-package com.alibaba.jstorm.batch;
-
-import backtype.storm.topology.BasicOutputCollector;
-
-
-public interface IPostCommit {
-	/**
-	 * Do after commit
-	 * Don't care failure of postCommit
-	 * 
-	 * @param id
-	 */
-	void postCommit(BatchId id, BasicOutputCollector collector);
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client-extension/src/main/java/com/alibaba/jstorm/batch/IPrepareCommit.java
----------------------------------------------------------------------
diff --git a/jstorm-client-extension/src/main/java/com/alibaba/jstorm/batch/IPrepareCommit.java b/jstorm-client-extension/src/main/java/com/alibaba/jstorm/batch/IPrepareCommit.java
deleted file mode 100644
index dd3da44..0000000
--- a/jstorm-client-extension/src/main/java/com/alibaba/jstorm/batch/IPrepareCommit.java
+++ /dev/null
@@ -1,20 +0,0 @@
-package com.alibaba.jstorm.batch;
-
-import backtype.storm.topology.BasicOutputCollector;
-import backtype.storm.topology.FailedException;
-
-/**
- * Called before commit, after finish batch
- * 
- * @author zhongyan.feng
- */
-public interface IPrepareCommit {
-
-	/**
-	 * Do prepare before commit
-	 * 
-	 * @param id
-	 * @param collector
-	 */
-	void prepareCommit(BatchId id, BasicOutputCollector collector) throws FailedException;
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client-extension/src/main/java/com/alibaba/jstorm/batch/impl/BatchSpoutMsgId.java
----------------------------------------------------------------------
diff --git a/jstorm-client-extension/src/main/java/com/alibaba/jstorm/batch/impl/BatchSpoutMsgId.java b/jstorm-client-extension/src/main/java/com/alibaba/jstorm/batch/impl/BatchSpoutMsgId.java
deleted file mode 100644
index 63704fb..0000000
--- a/jstorm-client-extension/src/main/java/com/alibaba/jstorm/batch/impl/BatchSpoutMsgId.java
+++ /dev/null
@@ -1,49 +0,0 @@
-package com.alibaba.jstorm.batch.impl;
-
-import java.io.Serializable;
-
-import org.apache.commons.lang.builder.ToStringBuilder;
-import org.apache.commons.lang.builder.ToStringStyle;
-
-import com.alibaba.jstorm.batch.BatchId;
-import com.alibaba.jstorm.batch.util.BatchStatus;
-
-public class BatchSpoutMsgId implements Serializable{
-
-	/**  */
-	private static final long serialVersionUID = 2899009971479957517L;
-	
-	private final BatchId batchId;
-	private BatchStatus batchStatus;
-	
-	protected BatchSpoutMsgId(BatchId batchId, BatchStatus batchStatus) {
-		this.batchId = batchId;
-		this.batchStatus = batchStatus;
-	}
-	
-	public static BatchSpoutMsgId mkInstance() {
-		BatchId batchId = BatchId.mkInstance();
-		BatchStatus batchStatus = BatchStatus.COMPUTING;
-		
-		return new BatchSpoutMsgId(batchId, batchStatus);
-	}
-	
-	
-	public BatchStatus getBatchStatus() {
-		return batchStatus;
-	}
-
-	public void setBatchStatus(BatchStatus batchStatus) {
-		this.batchStatus = batchStatus;
-	}
-
-	public BatchId getBatchId() {
-		return batchId;
-	}
-
-	@Override
-	public String toString() {
-		return ToStringBuilder.reflectionToString(this,
-				ToStringStyle.SHORT_PREFIX_STYLE);
-	}
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client-extension/src/main/java/com/alibaba/jstorm/batch/impl/BatchSpoutTrigger.java
----------------------------------------------------------------------
diff --git a/jstorm-client-extension/src/main/java/com/alibaba/jstorm/batch/impl/BatchSpoutTrigger.java b/jstorm-client-extension/src/main/java/com/alibaba/jstorm/batch/impl/BatchSpoutTrigger.java
deleted file mode 100644
index d0c3e94..0000000
--- a/jstorm-client-extension/src/main/java/com/alibaba/jstorm/batch/impl/BatchSpoutTrigger.java
+++ /dev/null
@@ -1,312 +0,0 @@
-package com.alibaba.jstorm.batch.impl;
-
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.log4j.Logger;
-
-import backtype.storm.Config;
-import backtype.storm.spout.SpoutOutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.IRichSpout;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.tuple.Fields;
-import backtype.storm.tuple.Values;
-
-import com.alibaba.jstorm.batch.BatchId;
-import com.alibaba.jstorm.batch.util.BatchCommon;
-import com.alibaba.jstorm.batch.util.BatchDef;
-import com.alibaba.jstorm.batch.util.BatchStatus;
-import com.alibaba.jstorm.client.ConfigExtension;
-import com.alibaba.jstorm.cluster.ClusterState;
-import com.alibaba.jstorm.utils.IntervalCheck;
-import com.alibaba.jstorm.utils.JStormUtils;
-
-/**
- * Strong Sequence
- * 
- * @author zhongyan.feng
- * @version
- */
-public class BatchSpoutTrigger implements IRichSpout {
-	/**  */
-	private static final long serialVersionUID = 7215109169247425954L;
-
-	private static final Logger LOG = Logger.getLogger(BatchSpoutTrigger.class);
-
-	private LinkedBlockingQueue<BatchSpoutMsgId> batchQueue;
-
-	private transient ClusterState zkClient;
-
-	private transient SpoutOutputCollector collector;
-
-	private static final String ZK_NODE_PATH = "/trigger";
-
-	private static BatchId currentBatchId = null;
-
-	private Map conf;
-
-	private String taskName;
-	
-	private IntervalCheck intervalCheck;
-
-	/**
-	 * @throws Exception
-	 * 
-	 */
-	public void initMsgId() throws Exception {
-		Long zkMsgId = null;
-		byte[] data = zkClient.get_data(ZK_NODE_PATH, false);
-		if (data != null) {
-			String value = new String(data);
-			try {
-				zkMsgId = Long.valueOf(value);
-				LOG.info("ZK msgId:" + zkMsgId);
-			} catch (Exception e) {
-				LOG.warn("Failed to get msgId ", e);
-
-			}
-
-		}
-
-		if (zkMsgId != null) {
-			BatchId.updateId(zkMsgId);
-		}
-
-		int max_spout_pending = JStormUtils.parseInt(
-				conf.get(Config.TOPOLOGY_MAX_SPOUT_PENDING), 1);
-
-		for (int i = 0; i < max_spout_pending; i++) {
-			BatchSpoutMsgId msgId = BatchSpoutMsgId.mkInstance();
-			if (currentBatchId == null) {
-				currentBatchId = msgId.getBatchId();
-			}
-			batchQueue.offer(msgId);
-			LOG.info("Push into queue," + msgId);
-		}
-
-	}
-
-	@Override
-	public void open(Map conf, TopologyContext context,
-			SpoutOutputCollector collector) {
-		batchQueue = new LinkedBlockingQueue<BatchSpoutMsgId>();
-		this.collector = collector;
-		this.conf = conf;
-		taskName = context.getThisComponentId() + "_" + context.getThisTaskId();
-		
-		intervalCheck = new IntervalCheck();
-
-		try {
-			zkClient = BatchCommon.getZkClient(conf);
-
-			initMsgId();
-
-		} catch (Exception e) {
-			LOG.error("", e);
-			throw new RuntimeException("Failed to init");
-		}
-		LOG.info("Successfully open " + taskName);
-	}
-
-	@Override
-	public void close() {
-	}
-
-	@Override
-	public void activate() {
-		LOG.info("Activate " + taskName);
-	}
-
-	@Override
-	public void deactivate() {
-		LOG.info("Deactivate " + taskName);
-	}
-
-	protected String getStreamId(BatchStatus batchStatus) {
-		if (batchStatus == BatchStatus.COMPUTING) {
-			return BatchDef.COMPUTING_STREAM_ID;
-		} else if (batchStatus == BatchStatus.PREPARE_COMMIT) {
-			return BatchDef.PREPARE_STREAM_ID;
-		} else if (batchStatus == BatchStatus.COMMIT) {
-			return BatchDef.COMMIT_STREAM_ID;
-		} else if (batchStatus == BatchStatus.POST_COMMIT) {
-			return BatchDef.POST_STREAM_ID;
-		} else if (batchStatus == BatchStatus.REVERT_COMMIT) {
-			return BatchDef.REVERT_STREAM_ID;
-		} else {
-			LOG.error("Occur unkonw type BatchStatus " + batchStatus);
-			throw new RuntimeException();
-		}
-	}
-
-	protected boolean isCommitStatus(BatchStatus batchStatus) {
-		if (batchStatus == BatchStatus.COMMIT) {
-			return true;
-		} else if (batchStatus == BatchStatus.REVERT_COMMIT) {
-			return true;
-		} else {
-			return false;
-		}
-	}
-
-	protected boolean isCommitWait(BatchSpoutMsgId msgId) {
-
-		if (isCommitStatus(msgId.getBatchStatus()) == false) {
-			return false;
-		}
-
-		// left status is commit status
-		if (currentBatchId.getId() >= msgId.getBatchId().getId()) {
-			return false;
-		}
-
-		return true;
-	}
-
-	@Override
-	public void nextTuple() {
-		BatchSpoutMsgId msgId = null;
-		try {
-			msgId = batchQueue.poll(10, TimeUnit.MILLISECONDS);
-		} catch (InterruptedException e) {
-			LOG.error("", e);
-		}
-		if (msgId == null) {
-			return;
-		}
-
-		if (isCommitWait(msgId)) {
-
-			batchQueue.offer(msgId);
-			if (intervalCheck.check()) {
-				LOG.info("Current msgId " + msgId
-						+ ", but current commit BatchId is " + currentBatchId);
-			}else {
-				LOG.debug("Current msgId " + msgId
-						+ ", but current commit BatchId is " + currentBatchId);
-			}
-			
-			return;
-		}
-
-		String streamId = getStreamId(msgId.getBatchStatus());
-		List<Integer> outTasks = collector.emit(streamId,
-				new Values(msgId.getBatchId()), msgId);
-		if (outTasks.isEmpty()) {
-			forward(msgId);
-		}
-		return;
-
-	}
-
-	protected void mkMsgId(BatchSpoutMsgId oldMsgId) {
-		synchronized (BatchSpoutMsgId.class) {
-			if (currentBatchId.getId() <= oldMsgId.getBatchId().getId()) {
-				// this is normal case
-
-				byte[] data = String.valueOf(currentBatchId.getId()).getBytes();
-				try {
-					zkClient.set_data(ZK_NODE_PATH, data);
-				} catch (Exception e) {
-					LOG.error("Failed to update to ZK " + oldMsgId, e);
-				}
-
-				currentBatchId = BatchId.incBatchId(oldMsgId.getBatchId());
-
-			} else {
-				// bigger batchId has been failed, when old msgId finish
-				// it will go here
-
-			}
-
-		}
-
-		BatchSpoutMsgId newMsgId = BatchSpoutMsgId.mkInstance();
-		batchQueue.offer(newMsgId);
-		StringBuilder sb = new StringBuilder();
-		sb.append("Create new BatchId,");
-		sb.append("old:").append(oldMsgId);
-		sb.append("new:").append(newMsgId);
-		sb.append("currentBatchId:").append(currentBatchId);
-		LOG.info(sb.toString());
-	}
-
-	protected void forward(BatchSpoutMsgId msgId) {
-		BatchStatus status = msgId.getBatchStatus();
-
-		BatchStatus newStatus = status.forward();
-		if (newStatus == null) {
-			// create new status
-			mkMsgId(msgId);
-			LOG.info("Finish old batch " + msgId);
-
-		} else {
-			msgId.setBatchStatus(newStatus);
-			batchQueue.offer(msgId);
-			LOG.info("Forward batch " + msgId);
-		}
-	}
-
-	@Override
-	public void ack(Object msgId) {
-		if (msgId instanceof BatchSpoutMsgId) {
-			forward((BatchSpoutMsgId) msgId);
-			return;
-		} else {
-			LOG.warn("Unknown type msgId " + msgId.getClass().getName() + ":"
-					+ msgId);
-			return;
-		}
-	}
-
-	protected void handleFail(BatchSpoutMsgId msgId) {
-		LOG.info("Failed batch " + msgId);
-		BatchStatus status = msgId.getBatchStatus();
-
-		BatchStatus newStatus = status.error();
-		if (newStatus == BatchStatus.ERROR) {
-			// create new status
-			mkMsgId(msgId);
-
-		} else {
-
-			msgId.setBatchStatus(newStatus);
-			batchQueue.offer(msgId);
-
-		}
-	}
-
-	@Override
-	public void fail(Object msgId) {
-		if (msgId instanceof BatchSpoutMsgId) {
-			handleFail((BatchSpoutMsgId) msgId);
-		} else {
-			LOG.warn("Unknown type msgId " + msgId.getClass().getName() + ":"
-					+ msgId);
-			return;
-		}
-	}
-
-	@Override
-	public void declareOutputFields(OutputFieldsDeclarer declarer) {
-		declarer.declareStream(BatchDef.COMPUTING_STREAM_ID, new Fields(
-				"BatchId"));
-		declarer.declareStream(BatchDef.PREPARE_STREAM_ID,
-				new Fields("BatchId"));
-		declarer.declareStream(BatchDef.COMMIT_STREAM_ID, new Fields("BatchId"));
-		declarer.declareStream(BatchDef.REVERT_STREAM_ID, new Fields("BatchId"));
-		declarer.declareStream(BatchDef.POST_STREAM_ID, new Fields("BatchId"));
-	}
-
-	@Override
-	public Map<String, Object> getComponentConfiguration() {
-		Map<String, Object> map = new HashMap<String, Object>();
-		ConfigExtension.setSpoutSingleThread(map, true);
-		return map;
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client-extension/src/main/java/com/alibaba/jstorm/batch/impl/CoordinatedBolt.java
----------------------------------------------------------------------
diff --git a/jstorm-client-extension/src/main/java/com/alibaba/jstorm/batch/impl/CoordinatedBolt.java b/jstorm-client-extension/src/main/java/com/alibaba/jstorm/batch/impl/CoordinatedBolt.java
deleted file mode 100644
index 0f4720b..0000000
--- a/jstorm-client-extension/src/main/java/com/alibaba/jstorm/batch/impl/CoordinatedBolt.java
+++ /dev/null
@@ -1,281 +0,0 @@
-package com.alibaba.jstorm.batch.impl;
-
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.List;
-import java.util.Map;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import backtype.storm.task.OutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.BasicOutputCollector;
-import backtype.storm.topology.FailedException;
-import backtype.storm.topology.IBasicBolt;
-import backtype.storm.topology.IRichBolt;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.topology.ReportedFailedException;
-import backtype.storm.tuple.Tuple;
-import backtype.storm.utils.TimeCacheMap;
-
-import com.alibaba.jstorm.batch.BatchId;
-import com.alibaba.jstorm.batch.ICommitter;
-import com.alibaba.jstorm.batch.IPostCommit;
-import com.alibaba.jstorm.batch.IPrepareCommit;
-import com.alibaba.jstorm.batch.util.BatchCommon;
-import com.alibaba.jstorm.batch.util.BatchDef;
-import com.alibaba.jstorm.batch.util.BatchStatus;
-import com.alibaba.jstorm.cluster.ClusterState;
-
-public class CoordinatedBolt implements IRichBolt {
-	private static final long serialVersionUID = 5720810158625748046L;
-	
-	public static Logger LOG = LoggerFactory.getLogger(CoordinatedBolt.class);
-
-	private IBasicBolt delegate;
-	private BasicOutputCollector basicCollector;
-	private OutputCollector collector;
-
-	private String taskId;
-	private String taskName;
-
-	private boolean isCommiter = false;
-	private String zkCommitPath;
-	private TimeCacheMap<Object, Object> commited;
-
-	public CoordinatedBolt(IBasicBolt delegate) {
-
-		this.delegate = delegate;
-
-	}
-
-	// use static variable to reduce zk connection
-	private static ClusterState zkClient = null;
-
-	public void mkCommitDir(Map conf) {
-
-		try {
-			zkClient = BatchCommon.getZkClient(conf);
-
-			zkCommitPath = BatchDef.ZK_COMMIT_DIR + BatchDef.ZK_SEPERATOR
-					+ taskId;
-			if (zkClient.node_existed(zkCommitPath, false)) {
-				zkClient.delete_node(zkCommitPath);
-			}
-			zkClient.mkdirs(zkCommitPath);
-			
-			LOG.info(taskName + " successfully create commit path" + zkCommitPath);
-		} catch (Exception e) {
-			LOG.error("Failed to create zk node", e);
-			throw new RuntimeException();
-		}
-	}
-
-	public void prepare(Map conf, TopologyContext context,
-			OutputCollector collector) {
-
-		taskId = String.valueOf(context.getThisTaskId());
-		taskName = context.getThisComponentId() + "_" + context.getThisTaskId();
-
-		this.basicCollector = new BasicOutputCollector(collector);
-		this.collector = collector;
-
-		if (delegate instanceof ICommitter) {
-			isCommiter = true;
-			commited = new TimeCacheMap<Object, Object>(
-					context.maxTopologyMessageTimeout());
-			mkCommitDir(conf);
-		}
-
-		delegate.prepare(conf, context);
-
-	}
-
-	public void removeUseless(String path, int reserveSize) throws Exception {
-		List<String> childs = zkClient.get_children(path, false);
-		Collections.sort(childs, new Comparator<String>() {
-
-			@Override
-			public int compare(String o1, String o2) {
-				try {
-					Long v1 = Long.valueOf(o1);
-					Long v2 = Long.valueOf(o2);
-					return v1.compareTo(v2);
-				}catch(Exception e) {
-					return o1.compareTo(o2);
-				}
-				
-			}
-    		
-    	});
-
-		for (int index = 0; index < childs.size() - reserveSize; index++) {
-			zkClient.delete_node(path + BatchDef.ZK_SEPERATOR
-					+ childs.get(index));
-		}
-	}
-	
-	public String getCommitPath(BatchId id) {
-		return zkCommitPath + BatchDef.ZK_SEPERATOR + id.getId();
-	}
-
-	public void updateToZk(Object id, byte[] commitResult) {
-		try {
-
-			removeUseless(zkCommitPath, BatchDef.ZK_COMMIT_RESERVER_NUM);
-
-			String path = getCommitPath((BatchId)id);
-			byte[] data = commitResult;
-			if (data == null) {
-				data = new byte[0];
-			}
-			zkClient.set_data(path, data);
-			LOG.info("Update " + path + " to zk");
-		} catch (Exception e) {
-			LOG.warn("Failed to update to zk,", e);
-
-		}
-
-	}
-
-	public byte[] getCommittedData(Object id) {
-		try {
-			String path = getCommitPath((BatchId)id);
-			byte[] data = zkClient.get_data(path, false);
-
-			return data;
-		} catch (Exception e) {
-			LOG.error("Failed to visit ZK,", e);
-			return null;
-		}
-	}
-
-	public void handleRegular(Tuple tuple) {
-		basicCollector.setContext(tuple);
-		try {
-			delegate.execute(tuple, basicCollector);
-			collector.ack(tuple);
-		} catch (FailedException e) {
-			if (e instanceof ReportedFailedException) {
-				collector.reportError(e);
-			}
-			collector.fail(tuple);
-		}
-
-	}
-
-	public void handlePrepareCommit(Tuple tuple) {
-		basicCollector.setContext(tuple);
-		try {
-			BatchId id = (BatchId) tuple.getValue(0);
-			((IPrepareCommit) delegate).prepareCommit(id, basicCollector);
-			collector.ack(tuple);
-		} catch (FailedException e) {
-			if (e instanceof ReportedFailedException) {
-				collector.reportError(e);
-			}
-			collector.fail(tuple);
-		}
-
-	}
-
-	public void handleCommit(Tuple tuple) {
-		Object id = tuple.getValue(0);
-		try {
-			byte[] commitResult = ((ICommitter) delegate).commit((BatchId) id);
-
-			collector.ack(tuple);
-
-			updateToZk(id, commitResult);
-			commited.put(id, commitResult);
-		} catch (Exception e) {
-			LOG.error("Failed to commit ", e);
-			collector.fail(tuple);
-		}
-	}
-
-	public void handleRevert(Tuple tuple) {
-		try {
-			Object id = tuple.getValue(0);
-			byte[] commitResult = null;
-
-			if (commited.containsKey(id)) {
-				commitResult = (byte[]) commited.get(id);
-			} else {
-				commitResult = getCommittedData(id);
-			}
-
-			if (commitResult != null) {
-				((ICommitter) delegate).revert((BatchId) id, commitResult);
-			}
-		} catch (Exception e) {
-			LOG.error("Failed to revert,", e);
-		}
-
-		collector.ack(tuple);
-	}
-
-	public void handlePostCommit(Tuple tuple) {
-
-		basicCollector.setContext(tuple);
-		try {
-			BatchId id = (BatchId) tuple.getValue(0);
-			((IPostCommit) delegate).postCommit(id, basicCollector);
-			
-		} catch (Exception e) {
-			LOG.info("Failed to do postCommit,", e);
-		}
-		collector.ack(tuple);
-	}
-
-	public void execute(Tuple tuple) {
-
-		BatchStatus batchStatus = getBatchStatus(tuple);
-
-		if (batchStatus == BatchStatus.COMPUTING) {
-			handleRegular(tuple);
-		} else if (batchStatus == BatchStatus.PREPARE_COMMIT) {
-			handlePrepareCommit(tuple);
-		} else if (batchStatus == BatchStatus.COMMIT) {
-			handleCommit(tuple);
-		} else if (batchStatus == BatchStatus.REVERT_COMMIT) {
-			handleRevert(tuple);
-		} else if (batchStatus == BatchStatus.POST_COMMIT) {
-			handlePostCommit(tuple);
-		} else {
-			throw new RuntimeException(
-					"Receive commit tuple, but not committer");
-		}
-	}
-
-	public void cleanup() {
-		delegate.cleanup();
-	}
-
-	public void declareOutputFields(OutputFieldsDeclarer declarer) {
-		delegate.declareOutputFields(declarer);
-	}
-
-	@Override
-	public Map<String, Object> getComponentConfiguration() {
-		return delegate.getComponentConfiguration();
-	}
-
-	private BatchStatus getBatchStatus(Tuple tuple) {
-		String streamId = tuple.getSourceStreamId();
-
-		if (streamId.equals(BatchDef.PREPARE_STREAM_ID)) {
-			return BatchStatus.PREPARE_COMMIT;
-		} else if (streamId.equals(BatchDef.COMMIT_STREAM_ID)) {
-			return BatchStatus.COMMIT;
-		} else if (streamId.equals(BatchDef.REVERT_STREAM_ID)) {
-			return BatchStatus.REVERT_COMMIT;
-		} else if (streamId.equals(BatchDef.POST_STREAM_ID)) {
-			return BatchStatus.POST_COMMIT;
-		} else {
-			return BatchStatus.COMPUTING;
-		}
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client-extension/src/main/java/com/alibaba/jstorm/batch/util/BatchCommon.java
----------------------------------------------------------------------
diff --git a/jstorm-client-extension/src/main/java/com/alibaba/jstorm/batch/util/BatchCommon.java b/jstorm-client-extension/src/main/java/com/alibaba/jstorm/batch/util/BatchCommon.java
deleted file mode 100644
index f99edfa..0000000
--- a/jstorm-client-extension/src/main/java/com/alibaba/jstorm/batch/util/BatchCommon.java
+++ /dev/null
@@ -1,65 +0,0 @@
-package com.alibaba.jstorm.batch.util;
-
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.log4j.Logger;
-
-import backtype.storm.Config;
-
-import com.alibaba.jstorm.cluster.ClusterState;
-import com.alibaba.jstorm.cluster.DistributedClusterState;
-import com.alibaba.jstorm.utils.JStormUtils;
-
-public class BatchCommon {
-	private static final Logger LOG = Logger.getLogger(BatchCommon.class);
-
-	private static ClusterState zkClient = null;
-
-	public static ClusterState getZkClient(Map conf) throws Exception {
-		synchronized (BatchCommon.class) {
-			if (zkClient != null) {
-				return zkClient;
-			}
-
-			List<String> zkServers = null;
-			if (conf.get(Config.TRANSACTIONAL_ZOOKEEPER_SERVERS) != null) {
-				zkServers = (List<String>) conf
-						.get(Config.TRANSACTIONAL_ZOOKEEPER_SERVERS);
-			} else if (conf.get(Config.STORM_ZOOKEEPER_SERVERS) != null) {
-				zkServers = (List<String>) conf
-						.get(Config.STORM_ZOOKEEPER_SERVERS);
-			} else {
-				throw new RuntimeException("No setting zk");
-			}
-
-			int port = 2181;
-			if (conf.get(Config.TRANSACTIONAL_ZOOKEEPER_PORT) != null) {
-				port = JStormUtils.parseInt(
-						conf.get(Config.TRANSACTIONAL_ZOOKEEPER_PORT), 2181);
-			} else if (conf.get(Config.STORM_ZOOKEEPER_PORT) != null) {
-				port = JStormUtils.parseInt(
-						conf.get(Config.STORM_ZOOKEEPER_PORT), 2181);
-			}
-
-			String root = BatchDef.BATCH_ZK_ROOT;
-			if (conf.get(Config.TRANSACTIONAL_ZOOKEEPER_ROOT) != null) {
-				root = (String) conf.get(Config.TRANSACTIONAL_ZOOKEEPER_ROOT);
-			}
-
-			root = root + BatchDef.ZK_SEPERATOR
-					+ conf.get(Config.TOPOLOGY_NAME);
-
-			Map<Object, Object> tmpConf = new HashMap<Object, Object>();
-			tmpConf.putAll(conf);
-			tmpConf.put(Config.STORM_ZOOKEEPER_SERVERS, zkServers);
-			tmpConf.put(Config.STORM_ZOOKEEPER_ROOT, root);
-			zkClient = new DistributedClusterState(tmpConf);
-
-			LOG.info("Successfully connect ZK");
-			return zkClient;
-		}
-
-	}
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client-extension/src/main/java/com/alibaba/jstorm/batch/util/BatchDef.java
----------------------------------------------------------------------
diff --git a/jstorm-client-extension/src/main/java/com/alibaba/jstorm/batch/util/BatchDef.java b/jstorm-client-extension/src/main/java/com/alibaba/jstorm/batch/util/BatchDef.java
deleted file mode 100644
index 7d87cfa..0000000
--- a/jstorm-client-extension/src/main/java/com/alibaba/jstorm/batch/util/BatchDef.java
+++ /dev/null
@@ -1,26 +0,0 @@
-package com.alibaba.jstorm.batch.util;
-
-
-public class BatchDef {
-	public static final String COMPUTING_STREAM_ID = "batch/compute-stream";
-	
-	public static final String PREPARE_STREAM_ID = "batch/parepare-stream";
-
-	public static final String COMMIT_STREAM_ID = "batch/commit-stream";
-
-	public static final String REVERT_STREAM_ID = "batch/revert-stream";
-	
-	public static final String POST_STREAM_ID = "batch/post-stream";
-
-	public static final String SPOUT_TRIGGER = "spout_trigger";
-
-	public static final String BATCH_ZK_ROOT = "batch";
-
-	public static final String ZK_COMMIT_DIR = "/commit";
-
-	public static final int ZK_COMMIT_RESERVER_NUM = 3;
-
-	public static final String ZK_SEPERATOR = "/";
-
-	
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client-extension/src/main/java/com/alibaba/jstorm/batch/util/BatchStatus.java
----------------------------------------------------------------------
diff --git a/jstorm-client-extension/src/main/java/com/alibaba/jstorm/batch/util/BatchStatus.java b/jstorm-client-extension/src/main/java/com/alibaba/jstorm/batch/util/BatchStatus.java
deleted file mode 100644
index e02daeb..0000000
--- a/jstorm-client-extension/src/main/java/com/alibaba/jstorm/batch/util/BatchStatus.java
+++ /dev/null
@@ -1,33 +0,0 @@
-package com.alibaba.jstorm.batch.util;
-
-public  enum BatchStatus {
-	COMPUTING, 
-	PREPARE_COMMIT, 
-	COMMIT, 
-	REVERT_COMMIT, 
-	POST_COMMIT,
-	ERROR;
-
-	
-	
-	public BatchStatus forward() {
-		if (this == COMPUTING) {
-			return PREPARE_COMMIT;
-		}else if (this == PREPARE_COMMIT) {
-			return COMMIT;
-		}else if (this == COMMIT) {
-			return POST_COMMIT;
-		}else {
-			return null;
-		}
-	}
-	
-	public BatchStatus error() {
-		if (this == COMMIT) {
-			return REVERT_COMMIT;
-		}else {
-			return ERROR;
-		}
-	}
-	
-};

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client-extension/src/main/java/com/alibaba/jstorm/callback/AsyncLoopDefaultKill.java
----------------------------------------------------------------------
diff --git a/jstorm-client-extension/src/main/java/com/alibaba/jstorm/callback/AsyncLoopDefaultKill.java b/jstorm-client-extension/src/main/java/com/alibaba/jstorm/callback/AsyncLoopDefaultKill.java
deleted file mode 100644
index f392f22..0000000
--- a/jstorm-client-extension/src/main/java/com/alibaba/jstorm/callback/AsyncLoopDefaultKill.java
+++ /dev/null
@@ -1,25 +0,0 @@
-package com.alibaba.jstorm.callback;
-
-import com.alibaba.jstorm.utils.JStormUtils;
-
-/**
- * Killer callback
- * 
- * @author yannian
- * 
- */
-
-public class AsyncLoopDefaultKill extends RunnableCallback {
-
-	@Override
-	public <T> Object execute(T... args) {
-		Exception e = (Exception) args[0];
-		JStormUtils.halt_process(1, "Async loop died!");
-		return e;
-	}
-
-	@Override
-	public void run() {
-		JStormUtils.halt_process(1, "Async loop died!");
-	}
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client-extension/src/main/java/com/alibaba/jstorm/callback/AsyncLoopRunnable.java
----------------------------------------------------------------------
diff --git a/jstorm-client-extension/src/main/java/com/alibaba/jstorm/callback/AsyncLoopRunnable.java b/jstorm-client-extension/src/main/java/com/alibaba/jstorm/callback/AsyncLoopRunnable.java
deleted file mode 100644
index 1b1e588..0000000
--- a/jstorm-client-extension/src/main/java/com/alibaba/jstorm/callback/AsyncLoopRunnable.java
+++ /dev/null
@@ -1,82 +0,0 @@
-package com.alibaba.jstorm.callback;
-
-import org.apache.log4j.Logger;
-
-import com.alibaba.jstorm.utils.JStormUtils;
-
-/**
- * AsyncLoopThread 's runnable
- * 
- * The class wrapper RunnableCallback fn, if occur exception, run killfn
- * 
- * @author yannian
- * 
- */
-public class AsyncLoopRunnable implements Runnable {
-	private static Logger LOG = Logger.getLogger(AsyncLoopRunnable.class);
-
-	private RunnableCallback fn;
-	private RunnableCallback killfn;
-
-	public AsyncLoopRunnable(RunnableCallback fn, RunnableCallback killfn) {
-		this.fn = fn;
-		this.killfn = killfn;
-	}
-
-	private boolean needQuit(Object rtn) {
-		if (rtn != null) {
-			long sleepTime = Long.parseLong(String.valueOf(rtn));
-			if (sleepTime < 0) {
-				return true;
-			}else if (sleepTime > 0) {
-				JStormUtils.sleepMs(sleepTime * 1000);
-			} 
-		}
-		return false;
-	}
-
-	@Override
-	public void run() {
-
-		try {
-			while (true) {
-				Exception e = null;
-
-				try {
-					if (fn == null) {
-						LOG.warn("fn==null");
-						throw new RuntimeException("AsyncLoopRunnable no core function ");
-					}
-
-					fn.run();
-
-					e = fn.error();
-
-				} catch (Exception ex) {
-					e = ex;
-				}
-				if (e != null) {
-					fn.shutdown();
-					throw e;
-				}
-				Object rtn = fn.getResult();
-				if (this.needQuit(rtn)) {
-					return;
-				}
-
-			}
-		} catch (InterruptedException e) {
-			LOG.info("Async loop interrupted!");
-		} catch (Throwable e) {
-			Object rtn = fn.getResult();
-			if (this.needQuit(rtn)) {
-				return;
-			}else {
-				LOG.error("Async loop died!", e);
-				killfn.execute(e);
-			}
-		}
-
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client-extension/src/main/java/com/alibaba/jstorm/callback/AsyncLoopThread.java
----------------------------------------------------------------------
diff --git a/jstorm-client-extension/src/main/java/com/alibaba/jstorm/callback/AsyncLoopThread.java b/jstorm-client-extension/src/main/java/com/alibaba/jstorm/callback/AsyncLoopThread.java
deleted file mode 100644
index 534386d..0000000
--- a/jstorm-client-extension/src/main/java/com/alibaba/jstorm/callback/AsyncLoopThread.java
+++ /dev/null
@@ -1,122 +0,0 @@
-package com.alibaba.jstorm.callback;
-
-import java.lang.Thread.UncaughtExceptionHandler;
-
-import org.apache.log4j.Logger;
-
-import backtype.storm.utils.Time;
-
-import com.alibaba.jstorm.utils.JStormUtils;
-import com.alibaba.jstorm.utils.SmartThread;
-
-/**
- * Wrapper Timer thread Every several seconds execute afn, if something is run,
- * run kill_fn
- * 
- * 
- * @author yannian
- * 
- */
-public class AsyncLoopThread implements SmartThread {
-	private static final Logger LOG = Logger.getLogger(AsyncLoopThread.class);
-
-	private Thread thread;
-	
-	private RunnableCallback afn;
-
-	public AsyncLoopThread(RunnableCallback afn) {
-		this.init(afn, false, Thread.NORM_PRIORITY, true);
-	}
-
-	public AsyncLoopThread(RunnableCallback afn, boolean daemon, int priority,
-			boolean start) {
-		this.init(afn, daemon, priority, start);
-	}
-
-	public AsyncLoopThread(RunnableCallback afn, boolean daemon,
-			RunnableCallback kill_fn, int priority, boolean start) {
-		this.init(afn, daemon, kill_fn, priority, start);
-	}
-
-	public void init(RunnableCallback afn, boolean daemon, int priority,
-			boolean start) {
-		RunnableCallback kill_fn = new AsyncLoopDefaultKill();
-		this.init(afn, daemon, kill_fn, priority, start);
-	}
-
-	/**
-	 * 
-	 * @param afn
-	 * @param daemon
-	 * @param kill_fn
-	 *            (Exception e)
-	 * @param priority
-	 * @param args_fn
-	 * @param start
-	 */
-	private void init(RunnableCallback afn, boolean daemon,
-			RunnableCallback kill_fn, int priority, boolean start) {
-		if (kill_fn == null) {
-			kill_fn = new AsyncLoopDefaultKill();
-		}
-		
-		Runnable runable = new AsyncLoopRunnable(afn, kill_fn);
-		thread = new Thread(runable);
-		String threadName = afn.getThreadName();
-		if (threadName == null) {
-			threadName = afn.getClass().getSimpleName();
-		}
-		thread.setName(threadName);
-		thread.setDaemon(daemon);
-		thread.setPriority(priority);
-		thread.setUncaughtExceptionHandler(new UncaughtExceptionHandler() {
-			@Override
-			public void uncaughtException(Thread t, Throwable e) {
-				LOG.error("UncaughtException", e);
-				JStormUtils.halt_process(1, "UncaughtException");
-			}
-		});
-		
-		this.afn = afn;
-		
-		if (start) {
-			thread.start();
-		}
-		
-	}
-
-	@Override
-	public void start() {
-		thread.start();
-	}
-
-	@Override
-	public void join() throws InterruptedException {
-		thread.join();
-	}
-
-	// for test
-	public void join(int times) throws InterruptedException {
-		thread.join(times);
-	}
-
-	@Override
-	public void interrupt() {
-		thread.interrupt();
-	}
-
-	@Override
-	public Boolean isSleeping() {
-		return Time.isThreadWaiting(thread);
-	}
-	
-	public Thread getThread() {
-		return thread;
-	}
-
-    @Override
-    public void cleanup() {
-        // TODO Auto-generated method stub
-        afn.cleanup();
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client-extension/src/main/java/com/alibaba/jstorm/callback/BaseCallback.java
----------------------------------------------------------------------
diff --git a/jstorm-client-extension/src/main/java/com/alibaba/jstorm/callback/BaseCallback.java b/jstorm-client-extension/src/main/java/com/alibaba/jstorm/callback/BaseCallback.java
deleted file mode 100644
index 31012fa..0000000
--- a/jstorm-client-extension/src/main/java/com/alibaba/jstorm/callback/BaseCallback.java
+++ /dev/null
@@ -1,13 +0,0 @@
-package com.alibaba.jstorm.callback;
-
-import com.alibaba.jstorm.callback.Callback;
-
-public class BaseCallback implements Callback {
-
-	@Override
-	public <T> Object execute(T... args) {
-		// TODO Auto-generated method stub
-		return null;
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client-extension/src/main/java/com/alibaba/jstorm/callback/Callback.java
----------------------------------------------------------------------
diff --git a/jstorm-client-extension/src/main/java/com/alibaba/jstorm/callback/Callback.java b/jstorm-client-extension/src/main/java/com/alibaba/jstorm/callback/Callback.java
deleted file mode 100644
index d832a71..0000000
--- a/jstorm-client-extension/src/main/java/com/alibaba/jstorm/callback/Callback.java
+++ /dev/null
@@ -1,13 +0,0 @@
-package com.alibaba.jstorm.callback;
-
-/**
- * Callback interface
- * 
- * @author lixin 2012-3-12
- * 
- */
-public interface Callback {
-
-	public <T> Object execute(T... args);
-
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client-extension/src/main/java/com/alibaba/jstorm/callback/ClusterStateCallback.java
----------------------------------------------------------------------
diff --git a/jstorm-client-extension/src/main/java/com/alibaba/jstorm/callback/ClusterStateCallback.java b/jstorm-client-extension/src/main/java/com/alibaba/jstorm/callback/ClusterStateCallback.java
deleted file mode 100644
index 2726b1d..0000000
--- a/jstorm-client-extension/src/main/java/com/alibaba/jstorm/callback/ClusterStateCallback.java
+++ /dev/null
@@ -1,7 +0,0 @@
-package com.alibaba.jstorm.callback;
-
-import com.alibaba.jstorm.callback.BaseCallback;
-
-public class ClusterStateCallback extends BaseCallback {
-
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client-extension/src/main/java/com/alibaba/jstorm/callback/DefaultWatcherCallBack.java
----------------------------------------------------------------------
diff --git a/jstorm-client-extension/src/main/java/com/alibaba/jstorm/callback/DefaultWatcherCallBack.java b/jstorm-client-extension/src/main/java/com/alibaba/jstorm/callback/DefaultWatcherCallBack.java
deleted file mode 100644
index bd4ce25..0000000
--- a/jstorm-client-extension/src/main/java/com/alibaba/jstorm/callback/DefaultWatcherCallBack.java
+++ /dev/null
@@ -1,26 +0,0 @@
-package com.alibaba.jstorm.callback;
-
-import org.apache.log4j.Logger;
-import org.apache.zookeeper.Watcher.Event.EventType;
-import org.apache.zookeeper.Watcher.Event.KeeperState;
-
-import com.alibaba.jstorm.zk.ZkEventTypes;
-import com.alibaba.jstorm.zk.ZkKeeperStates;
-
-/**
- * Default ZK watch callback
- * 
- * @author yannian
- * 
- */
-public class DefaultWatcherCallBack implements WatcherCallBack {
-
-	private static Logger LOG = Logger.getLogger(DefaultWatcherCallBack.class);
-
-	@Override
-	public void execute(KeeperState state, EventType type, String path) {
-		LOG.info("Zookeeper state update:" + ZkKeeperStates.getStateName(state)
-				+ "," + ZkEventTypes.getStateName(type) + "," + path);
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client-extension/src/main/java/com/alibaba/jstorm/callback/RunnableCallback.java
----------------------------------------------------------------------
diff --git a/jstorm-client-extension/src/main/java/com/alibaba/jstorm/callback/RunnableCallback.java b/jstorm-client-extension/src/main/java/com/alibaba/jstorm/callback/RunnableCallback.java
deleted file mode 100644
index ccee6e2..0000000
--- a/jstorm-client-extension/src/main/java/com/alibaba/jstorm/callback/RunnableCallback.java
+++ /dev/null
@@ -1,47 +0,0 @@
-package com.alibaba.jstorm.callback;
-
-
-/**
- * Base Runnable/Callback function
- * 
- * @author yannian
- * 
- */
-public class RunnableCallback implements Runnable, Callback {
-
-	@Override
-	public <T> Object execute(T... args) {
-		return null;
-	}
-
-	@Override
-	public void run() {
-
-	}
-
-	public Exception error() {
-		return null;
-	}
-
-	public Object getResult() {
-		return null;
-	}
-
-	/**
-	 * Called by exception
-	 */
-	public void shutdown() {
-	}
-	
-	/**
-	 * Normal quit
-	 */
-	public void cleanup() {
-	    
-	}
-	
-	public String getThreadName() {
-		return null;
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client-extension/src/main/java/com/alibaba/jstorm/callback/WatcherCallBack.java
----------------------------------------------------------------------
diff --git a/jstorm-client-extension/src/main/java/com/alibaba/jstorm/callback/WatcherCallBack.java b/jstorm-client-extension/src/main/java/com/alibaba/jstorm/callback/WatcherCallBack.java
deleted file mode 100644
index 382cc4b..0000000
--- a/jstorm-client-extension/src/main/java/com/alibaba/jstorm/callback/WatcherCallBack.java
+++ /dev/null
@@ -1,8 +0,0 @@
-package com.alibaba.jstorm.callback;
-
-import org.apache.zookeeper.Watcher.Event.EventType;
-import org.apache.zookeeper.Watcher.Event.KeeperState;
-
-public interface WatcherCallBack {
-	public void execute(KeeperState state, EventType type, String path);
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client-extension/src/main/java/com/alibaba/jstorm/client/ConfigExtension.java
----------------------------------------------------------------------
diff --git a/jstorm-client-extension/src/main/java/com/alibaba/jstorm/client/ConfigExtension.java b/jstorm-client-extension/src/main/java/com/alibaba/jstorm/client/ConfigExtension.java
deleted file mode 100644
index f71a4ce..0000000
--- a/jstorm-client-extension/src/main/java/com/alibaba/jstorm/client/ConfigExtension.java
+++ /dev/null
@@ -1,642 +0,0 @@
-package com.alibaba.jstorm.client;
-
-import java.security.InvalidParameterException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.commons.lang.StringUtils;
-
-import backtype.storm.Config;
-import backtype.storm.utils.Utils;
-
-import com.alibaba.jstorm.utils.JStormUtils;
-
-public class ConfigExtension {
-	/**
-	 * if this configure has been set, the spout or bolt will log all receive
-	 * tuples
-	 * 
-	 * topology.debug just for logging all sent tuples
-	 */
-	protected static final String TOPOLOGY_DEBUG_RECV_TUPLE = "topology.debug.recv.tuple";
-
-	public static void setTopologyDebugRecvTuple(Map conf, boolean debug) {
-		conf.put(TOPOLOGY_DEBUG_RECV_TUPLE, Boolean.valueOf(debug));
-	}
-
-	public static Boolean isTopologyDebugRecvTuple(Map conf) {
-		return JStormUtils.parseBoolean(conf.get(TOPOLOGY_DEBUG_RECV_TUPLE),
-				false);
-	}
-
-	/**
-	 * port number of deamon httpserver server
-	 */
-	private static final Integer DEFAULT_DEAMON_HTTPSERVER_PORT = 7621;
-
-	protected static final String SUPERVISOR_DEAMON_HTTPSERVER_PORT = "supervisor.deamon.logview.port";
-
-	public static Integer getSupervisorDeamonHttpserverPort(Map conf) {
-		return JStormUtils.parseInt(
-				conf.get(SUPERVISOR_DEAMON_HTTPSERVER_PORT),
-				DEFAULT_DEAMON_HTTPSERVER_PORT + 1);
-	}
-
-	protected static final String NIMBUS_DEAMON_HTTPSERVER_PORT = "nimbus.deamon.logview.port";
-
-	public static Integer getNimbusDeamonHttpserverPort(Map conf) {
-		return JStormUtils.parseInt(conf.get(NIMBUS_DEAMON_HTTPSERVER_PORT),
-				DEFAULT_DEAMON_HTTPSERVER_PORT);
-	}
-
-	/**
-	 * Worker gc parameter
-	 * 
-	 * 
-	 */
-	protected static final String WORKER_GC_CHILDOPTS = "worker.gc.childopts";
-
-	public static void setWorkerGc(Map conf, String gc) {
-		conf.put(WORKER_GC_CHILDOPTS, gc);
-	}
-
-	public static String getWorkerGc(Map conf) {
-		return (String) conf.get(WORKER_GC_CHILDOPTS);
-	}
-
-	protected static final String WOREKER_REDIRECT_OUTPUT = "worker.redirect.output";
-
-	public static boolean getWorkerRedirectOutput(Map conf) {
-		Object result = conf.get(WOREKER_REDIRECT_OUTPUT);
-		if (result == null)
-			return true;
-		return (Boolean) result;
-	}
-	
-	protected static final String WOREKER_REDIRECT_OUTPUT_FILE = "worker.redirect.output.file";
-	
-	public static void setWorkerRedirectOutputFile(Map conf, String outputPath) {
-		conf.put(WOREKER_REDIRECT_OUTPUT_FILE, outputPath);
-	}
-	
-	public static String getWorkerRedirectOutputFile(Map conf) {
-		return (String)conf.get(WOREKER_REDIRECT_OUTPUT_FILE);
-	}
-
-	/**
-	 * Usually, spout finish prepare before bolt, so spout need wait several
-	 * seconds so that bolt finish preparation
-	 * 
-	 * By default, the setting is 30 seconds
-	 */
-	protected static final String SPOUT_DELAY_RUN = "spout.delay.run";
-
-	public static void setSpoutDelayRunSeconds(Map conf, int delay) {
-		conf.put(SPOUT_DELAY_RUN, Integer.valueOf(delay));
-	}
-
-	public static int getSpoutDelayRunSeconds(Map conf) {
-		return JStormUtils.parseInt(conf.get(SPOUT_DELAY_RUN), 30);
-	}
-
-	/**
-	 * Default ZMQ Pending queue size
-	 */
-	public static final int DEFAULT_ZMQ_MAX_QUEUE_MSG = 1000;
-
-	/**
-	 * One task will alloc how many memory slot, the default setting is 1
-	 */
-	protected static final String MEM_SLOTS_PER_TASK = "memory.slots.per.task";
-
-	@Deprecated
-	public static void setMemSlotPerTask(Map conf, int slotNum) {
-		if (slotNum < 1) {
-			throw new InvalidParameterException();
-		}
-		conf.put(MEM_SLOTS_PER_TASK, Integer.valueOf(slotNum));
-	}
-
-	/**
-	 * One task will use cpu slot number, the default setting is 1
-	 */
-	protected static final String CPU_SLOTS_PER_TASK = "cpu.slots.per.task";
-
-	@Deprecated
-	public static void setCpuSlotsPerTask(Map conf, int slotNum) {
-		if (slotNum < 1) {
-			throw new InvalidParameterException();
-		}
-		conf.put(CPU_SLOTS_PER_TASK, Integer.valueOf(slotNum));
-	}
-
-	/**
-	 * if the setting has been set, the component's task must run different node
-	 * This is conflict with USE_SINGLE_NODE
-	 */
-	protected static final String TASK_ON_DIFFERENT_NODE = "task.on.differ.node";
-
-	public static void setTaskOnDifferentNode(Map conf, boolean isIsolate) {
-		conf.put(TASK_ON_DIFFERENT_NODE, Boolean.valueOf(isIsolate));
-	}
-
-	public static boolean isTaskOnDifferentNode(Map conf) {
-		return JStormUtils
-				.parseBoolean(conf.get(TASK_ON_DIFFERENT_NODE), false);
-	}
-
-	protected static final String SUPERVISOR_ENABLE_CGROUP = "supervisor.enable.cgroup";
-
-	public static boolean isEnableCgroup(Map conf) {
-		return JStormUtils.parseBoolean(conf.get(SUPERVISOR_ENABLE_CGROUP),
-				false);
-	}
-
-	/**
-	 * If component or topology configuration set "use.old.assignment", will try
-	 * use old assignment firstly
-	 */
-	protected static final String USE_OLD_ASSIGNMENT = "use.old.assignment";
-
-	public static void setUseOldAssignment(Map conf, boolean useOld) {
-		conf.put(USE_OLD_ASSIGNMENT, Boolean.valueOf(useOld));
-	}
-	
-	public static boolean isUseOldAssignment(Map conf) {
-		return JStormUtils.parseBoolean(conf.get(USE_OLD_ASSIGNMENT), false);
-	}
-
-	/**
-	 * The supervisor's hostname
-	 */
-	protected static final String SUPERVISOR_HOSTNAME = "supervisor.hostname";
-	public static final Object SUPERVISOR_HOSTNAME_SCHEMA = String.class;
-
-	public static String getSupervisorHost(Map conf) {
-		return (String) conf.get(SUPERVISOR_HOSTNAME);
-	}
-
-	protected static final String SUPERVISOR_USE_IP = "supervisor.use.ip";
-
-	public static boolean isSupervisorUseIp(Map conf) {
-		return JStormUtils.parseBoolean(conf.get(SUPERVISOR_USE_IP), false);
-	}
-
-	protected static final String NIMBUS_USE_IP = "nimbus.use.ip";
-
-	public static boolean isNimbusUseIp(Map conf) {
-		return JStormUtils.parseBoolean(conf.get(NIMBUS_USE_IP), false);
-	}
-
-	protected static final String TOPOLOGY_ENABLE_CLASSLOADER = "topology.enable.classloader";
-
-	public static boolean isEnableTopologyClassLoader(Map conf) {
-		return JStormUtils.parseBoolean(conf.get(TOPOLOGY_ENABLE_CLASSLOADER),
-				false);
-	}
-
-	public static void setEnableTopologyClassLoader(Map conf, boolean enable) {
-		conf.put(TOPOLOGY_ENABLE_CLASSLOADER, Boolean.valueOf(enable));
-	}
-	
-	protected static String CLASSLOADER_DEBUG = "classloader.debug";
-	
-	public static boolean isEnableClassloaderDebug(Map conf) {
-		return JStormUtils.parseBoolean(conf.get(CLASSLOADER_DEBUG), false);
-	}
-	
-	public static void setEnableClassloaderDebug(Map conf, boolean enable) {
-		conf.put(CLASSLOADER_DEBUG, enable);
-	}
-
-	protected static final String CONTAINER_NIMBUS_HEARTBEAT = "container.nimbus.heartbeat";
-
-	/**
-	 * Get to know whether nimbus is run under Apsara/Yarn container
-	 * 
-	 * @param conf
-	 * @return
-	 */
-	public static boolean isEnableContainerNimbus() {
-		String path = System.getenv(CONTAINER_NIMBUS_HEARTBEAT);
-
-		if (StringUtils.isBlank(path)) {
-			return false;
-		} else {
-			return true;
-		}
-	}
-
-	/**
-	 * Get Apsara/Yarn nimbus container's hearbeat dir
-	 * 
-	 * @param conf
-	 * @return
-	 */
-	public static String getContainerNimbusHearbeat() {
-		return System.getenv(CONTAINER_NIMBUS_HEARTBEAT);
-	}
-
-	protected static final String CONTAINER_SUPERVISOR_HEARTBEAT = "container.supervisor.heartbeat";
-
-	/**
-	 * Get to know whether supervisor is run under Apsara/Yarn supervisor
-	 * container
-	 * 
-	 * @param conf
-	 * @return
-	 */
-	public static boolean isEnableContainerSupervisor() {
-		String path = System.getenv(CONTAINER_SUPERVISOR_HEARTBEAT);
-
-		if (StringUtils.isBlank(path)) {
-			return false;
-		} else {
-			return true;
-		}
-	}
-
-	/**
-	 * Get Apsara/Yarn supervisor container's hearbeat dir
-	 * 
-	 * @param conf
-	 * @return
-	 */
-	public static String getContainerSupervisorHearbeat() {
-		return (String) System.getenv(CONTAINER_SUPERVISOR_HEARTBEAT);
-	}
-
-	protected static final String CONTAINER_HEARTBEAT_TIMEOUT_SECONDS = "container.heartbeat.timeout.seconds";
-
-	public static int getContainerHeartbeatTimeoutSeconds(Map conf) {
-		return JStormUtils.parseInt(
-				conf.get(CONTAINER_HEARTBEAT_TIMEOUT_SECONDS), 240);
-	}
-
-	protected static final String CONTAINER_HEARTBEAT_FREQUENCE = "container.heartbeat.frequence";
-
-	public static int getContainerHeartbeatFrequence(Map conf) {
-		return JStormUtils
-				.parseInt(conf.get(CONTAINER_HEARTBEAT_FREQUENCE), 10);
-	}
-
-	protected static final String JAVA_SANDBOX_ENABLE = "java.sandbox.enable";
-
-	public static boolean isJavaSandBoxEnable(Map conf) {
-		return JStormUtils.parseBoolean(conf.get(JAVA_SANDBOX_ENABLE), false);
-	}
-
-	protected static String SPOUT_SINGLE_THREAD = "spout.single.thread";
-
-	public static boolean isSpoutSingleThread(Map conf) {
-		return JStormUtils.parseBoolean(conf.get(SPOUT_SINGLE_THREAD), false);
-	}
-
-	public static void setSpoutSingleThread(Map conf, boolean enable) {
-		conf.put(SPOUT_SINGLE_THREAD, enable);
-	}
-
-	protected static String WORKER_STOP_WITHOUT_SUPERVISOR = "worker.stop.without.supervisor";
-
-	public static boolean isWorkerStopWithoutSupervisor(Map conf) {
-		return JStormUtils.parseBoolean(
-				conf.get(WORKER_STOP_WITHOUT_SUPERVISOR), false);
-	}
-
-	protected static String CGROUP_ROOT_DIR = "supervisor.cgroup.rootdir";
-
-	public static String getCgroupRootDir(Map conf) {
-		return (String) conf.get(CGROUP_ROOT_DIR);
-	}
-
-	protected static String NETTY_TRANSFER_ASYNC_AND_BATCH = "storm.messaging.netty.transfer.async.batch";
-
-	public static boolean isNettyTransferAsyncBatch(Map conf) {
-		return JStormUtils.parseBoolean(
-				conf.get(NETTY_TRANSFER_ASYNC_AND_BATCH), true);
-	}
-
-	protected static final String USE_USERDEFINE_ASSIGNMENT = "use.userdefine.assignment";
-
-	public static void setUserDefineAssignment(Map conf,
-			List<WorkerAssignment> userDefines) {
-		List<String> ret = new ArrayList<String>();
-		for (WorkerAssignment worker : userDefines) {
-			ret.add(Utils.to_json(worker));
-		}
-		conf.put(USE_USERDEFINE_ASSIGNMENT, ret);
-	}
-	
-	public static List<WorkerAssignment> getUserDefineAssignment(Map conf) {
-		List<WorkerAssignment> ret = new ArrayList<WorkerAssignment>();
-		if (conf.get(USE_USERDEFINE_ASSIGNMENT) == null)
-			return ret;
-		for (String worker : (List<String>) conf.get(USE_USERDEFINE_ASSIGNMENT)) {
-			ret.add(WorkerAssignment.parseFromObj(Utils.from_json(worker)));
-		}
-		return ret;
-	}
-
-	protected static final String MEMSIZE_PER_WORKER = "worker.memory.size";
-
-	public static void setMemSizePerWorker(Map conf, long memSize) {
-		conf.put(MEMSIZE_PER_WORKER, memSize);
-	}
-
-	public static void setMemSizePerWorkerByKB(Map conf, long memSize) {
-        long size = memSize * 1024l;
-        setMemSizePerWorker(conf, size);
-    }
-	
-    public static void setMemSizePerWorkerByMB(Map conf, long memSize) {
-        long size = memSize * 1024l;
-        setMemSizePerWorkerByKB(conf, size);
-    }
-
-    public static void setMemSizePerWorkerByGB(Map conf, long memSize) {
-        long size = memSize * 1024l;
-        setMemSizePerWorkerByMB(conf, size);
-    }
-    
-    public static long getMemSizePerWorker(Map conf) {
-		long size = JStormUtils.parseLong(conf.get(MEMSIZE_PER_WORKER),
-				JStormUtils.SIZE_1_G * 2);
-		return size > 0 ? size : JStormUtils.SIZE_1_G * 2;
-	}
-    
-	protected static final String CPU_SLOT_PER_WORKER = "worker.cpu.slot.num";
-
-	public static void setCpuSlotNumPerWorker(Map conf, int slotNum) {
-		conf.put(CPU_SLOT_PER_WORKER, slotNum);
-	}
-	
-	public static int getCpuSlotPerWorker(Map conf) {
-		int slot = JStormUtils.parseInt(conf.get(CPU_SLOT_PER_WORKER), 1);
-		return slot > 0 ? slot : 1;
-	}
-
-	protected static String TOPOLOGY_PERFORMANCE_METRICS = "topology.performance.metrics";
-
-	public static boolean isEnablePerformanceMetrics(Map conf) {
-		return JStormUtils.parseBoolean(conf.get(TOPOLOGY_PERFORMANCE_METRICS),
-				true);
-	}
-
-	public static void setPerformanceMetrics(Map conf, boolean isEnable) {
-		conf.put(TOPOLOGY_PERFORMANCE_METRICS, isEnable);
-	}
-
-	protected static String NETTY_BUFFER_THRESHOLD_SIZE = "storm.messaging.netty.buffer.threshold";
-
-	public static long getNettyBufferThresholdSize(Map conf) {
-		return JStormUtils.parseLong(conf.get(NETTY_BUFFER_THRESHOLD_SIZE),
-				8 *JStormUtils.SIZE_1_M);
-	}
-
-	public static void setNettyBufferThresholdSize(Map conf, long size) {
-		conf.put(NETTY_BUFFER_THRESHOLD_SIZE, size);
-	}
-	
-	protected static String NETTY_MAX_SEND_PENDING = "storm.messaging.netty.max.pending";
-	
-	public static void setNettyMaxSendPending(Map conf, long pending) {
-		conf.put(NETTY_MAX_SEND_PENDING, pending);
-	}
-	
-	public static long getNettyMaxSendPending(Map conf) {
-		return JStormUtils.parseLong(conf.get(NETTY_MAX_SEND_PENDING), 16);
-	}
-
-	protected static String DISRUPTOR_USE_SLEEP = "disruptor.use.sleep";
-    
-    public static boolean isDisruptorUseSleep(Map conf) {
-    	return JStormUtils.parseBoolean(conf.get(DISRUPTOR_USE_SLEEP), true);
-    }
-    
-    public static void setDisruptorUseSleep(Map conf, boolean useSleep) {
-    	conf.put(DISRUPTOR_USE_SLEEP, useSleep);
-    }
-    
-    public static boolean isTopologyContainAcker(Map conf) {
-    	int num = JStormUtils.parseInt(conf.get(Config.TOPOLOGY_ACKER_EXECUTORS), 1);
-    	if (num > 0) {
-    		return true;
-    	}else {
-    		return false;
-    	}
-    }
-    
-    protected static String NETTY_SYNC_MODE = "storm.messaging.netty.sync.mode";
-    
-    public static boolean isNettySyncMode(Map conf) {
-    	return JStormUtils.parseBoolean(conf.get(NETTY_SYNC_MODE), false);
-    }
-    
-    public static void setNettySyncMode(Map conf, boolean sync) {
-    	conf.put(NETTY_SYNC_MODE, sync);
-    }
-    
-    protected static String NETTY_ASYNC_BLOCK = "storm.messaging.netty.async.block";
-    public static boolean isNettyASyncBlock(Map conf) {
-    	return JStormUtils.parseBoolean(conf.get(NETTY_ASYNC_BLOCK), true);
-    }
-    
-    public static void setNettyASyncBlock(Map conf, boolean block) {
-    	conf.put(NETTY_ASYNC_BLOCK, block);
-    }
-    
-    protected static String ALIMONITOR_METRICS_POST = "topology.alimonitor.metrics.post";
-    
-    public static boolean isAlimonitorMetricsPost(Map conf) {
-    	return JStormUtils.parseBoolean(conf.get(ALIMONITOR_METRICS_POST), true);
-    }
-    
-    public static void setAlimonitorMetricsPost(Map conf, boolean post) {
-    	conf.put(ALIMONITOR_METRICS_POST, post);
-    }
-	
-	protected static String TASK_CLEANUP_TIMEOUT_SEC = "task.cleanup.timeout.sec";
-    
-    public static int getTaskCleanupTimeoutSec(Map conf) {
-    	return JStormUtils.parseInt(conf.get(TASK_CLEANUP_TIMEOUT_SEC), 10);
-    }
-    
-    public static void setTaskCleanupTimeoutSec(Map conf, int timeout) {
-    	conf.put(TASK_CLEANUP_TIMEOUT_SEC, timeout);
-    }
-    
-    protected static String UI_CLUSTERS = "ui.clusters";
-    protected static String UI_CLUSTER_NAME = "name";
-    protected static String UI_CLUSTER_ZK_ROOT = "zkRoot";
-    protected static String UI_CLUSTER_ZK_SERVERS = "zkServers";
-    protected static String UI_CLUSTER_ZK_PORT = "zkPort";
-    
-    public static List<Map> getUiClusters(Map conf) {
-    	return (List<Map>) conf.get(UI_CLUSTERS);
-    }
-    
-    public static void setUiClusters(Map conf, List<Map> uiClusters) {
-    	conf.put(UI_CLUSTERS, uiClusters);
-    }
-    
-    public static Map getUiClusterInfo(List<Map> uiClusters, String name) {
-    	Map ret = null;
-    	for (Map cluster : uiClusters) {
-    		String clusterName = getUiClusterName(cluster);
-    		if (clusterName.equals(name)) {
-    			ret = cluster;
-    			break;
-    		}
-    	}
-    	
-    	return ret;
-    }
-     
-    public static String getUiClusterName(Map uiCluster) {
-    	return (String) uiCluster.get(UI_CLUSTER_NAME);
-    }
-    
-    public static String getUiClusterZkRoot(Map uiCluster) {
-    	return (String) uiCluster.get(UI_CLUSTER_ZK_ROOT);
-    }
-    
-    public static List<String> getUiClusterZkServers(Map uiCluster) {
-    	return (List<String>) uiCluster.get(UI_CLUSTER_ZK_SERVERS);
-    }
-    
-    public static Integer getUiClusterZkPort(Map uiCluster) {
-    	return JStormUtils.parseInt(uiCluster.get(UI_CLUSTER_ZK_PORT));
-    }
-    
-    protected static String SPOUT_PEND_FULL_SLEEP = "spout.pending.full.sleep";
-    
-    public static boolean isSpoutPendFullSleep(Map conf) {
-    	return JStormUtils.parseBoolean(conf.get(SPOUT_PEND_FULL_SLEEP), false);
-    }
-    
-    public static void setSpoutPendFullSleep(Map conf, boolean sleep) {
-    	conf.put(SPOUT_PEND_FULL_SLEEP, sleep);
-    	
-    }
-    
-    protected static String LOGVIEW_ENCODING = "supervisor.deamon.logview.encoding";
-    protected static String UTF8 = "utf-8";
-    
-    public static String getLogViewEncoding(Map conf) {
-    	String ret = (String) conf.get(LOGVIEW_ENCODING);
-    	if (ret == null) ret = UTF8;
-    	return ret;
-    }
-    
-    public static void setLogViewEncoding(Map conf, String enc) {
-    	conf.put(LOGVIEW_ENCODING,  enc);
-    }
-    
-    public static String TASK_STATUS_ACTIVE = "Active";
-    public static String TASK_STATUS_STARTING = "Starting";
-    
-    protected static String ALIMONITOR_TOPO_METIRC_NAME = "topology.alimonitor.topo.metrics.name";
-    protected static String ALIMONITOR_TASK_METIRC_NAME = "topology.alimonitor.task.metrics.name";
-    protected static String ALIMONITOR_WORKER_METIRC_NAME = "topology.alimonitor.worker.metrics.name";
-    protected static String ALIMONITOR_USER_METIRC_NAME = "topology.alimonitor.user.metrics.name";
-    
-    public static String getAlmonTopoMetricName(Map conf) {
-    	return (String) conf.get(ALIMONITOR_TOPO_METIRC_NAME);
-    }
-    
-    public static String getAlmonTaskMetricName(Map conf) {
-    	return (String) conf.get(ALIMONITOR_TASK_METIRC_NAME);
-    }
-    
-    public static String getAlmonWorkerMetricName(Map conf) {
-    	return (String) conf.get(ALIMONITOR_WORKER_METIRC_NAME);
-    }
-    
-    public static String getAlmonUserMetricName(Map conf) {
-    	return (String) conf.get(ALIMONITOR_USER_METIRC_NAME);
-    }
-    
-    protected static String SPOUT_PARALLELISM = "topology.spout.parallelism";
-    protected static String BOLT_PARALLELISM = "topology.bolt.parallelism";
-    
-    public static Integer getSpoutParallelism(Map conf, String componentName) {
-        Integer ret = null;
-    	Map<String, String> map = (Map<String, String>)(conf.get(SPOUT_PARALLELISM));
-    	if(map != null) ret = JStormUtils.parseInt(map.get(componentName));
-    	return ret;
-    }
-    
-    public static Integer getBoltParallelism(Map conf, String componentName) {
-        Integer ret = null;
-    	Map<String, String> map = (Map<String, String>)(conf.get(BOLT_PARALLELISM));
-    	if(map != null) ret = JStormUtils.parseInt(map.get(componentName));
-        return ret;
-    }
-    
-    protected static String TOPOLOGY_BUFFER_SIZE_LIMITED = "topology.buffer.size.limited";
-    
-    public static void setTopologyBufferSizeLimited(Map conf, boolean limited) {
-    	conf.put(TOPOLOGY_BUFFER_SIZE_LIMITED, limited);
-    }
-    
-    public static boolean getTopologyBufferSizeLimited(Map conf) {
-    	boolean isSynchronized = isNettySyncMode(conf);
-    	if (isSynchronized == true) {
-    		return true;
-    	}
-    	
-    	return JStormUtils.parseBoolean(conf.get(TOPOLOGY_BUFFER_SIZE_LIMITED), true);
-    	
-    }
-    
-    protected static String SUPERVISOR_SLOTS_PORTS_BASE = "supervisor.slots.ports.base";
-    
-    public static int getSupervisorSlotsPortsBase(Map conf) {
-    	return JStormUtils.parseInt(conf.get(SUPERVISOR_SLOTS_PORTS_BASE), 6800);
-    }
-    
-    // SUPERVISOR_SLOTS_PORTS_BASE don't provide setting function, it must be set by configuration
-    
-    protected static String SUPERVISOR_SLOTS_PORT_CPU_WEIGHT = "supervisor.slots.port.cpu.weight";
-    public static double getSupervisorSlotsPortCpuWeight(Map conf) {
-    	Object value = conf.get(SUPERVISOR_SLOTS_PORT_CPU_WEIGHT);
-    	Double ret = JStormUtils.convertToDouble(value);
-    	if (ret == null) {
-    		return 1.0;
-    	}else {
-    		return ret;
-    	}
-    }
-    // SUPERVISOR_SLOTS_PORT_CPU_WEIGHT don't provide setting function, it must be set by configuration
-    
-    protected static String USER_DEFINED_LOG4J_CONF = "user.defined.log4j.conf";
-    
-    public static String getUserDefinedLog4jConf(Map conf) {
-    	return (String)conf.get(USER_DEFINED_LOG4J_CONF);
-    }
-    
-    public static void setUserDefinedLog4jConf(Map conf, String fileName) {
-    	conf.put(USER_DEFINED_LOG4J_CONF, fileName);
-    }
-    
-    protected static String USER_DEFINED_LOGBACK_CONF = "user.defined.logback.conf";
-    
-    public static String getUserDefinedLogbackConf(Map conf) {
-    	return (String)conf.get(USER_DEFINED_LOGBACK_CONF);
-    }
-    
-    public static void setUserDefinedLogbackConf(Map conf, String fileName) {
-    	conf.put(USER_DEFINED_LOGBACK_CONF, fileName);
-    }
-    
-    protected static String TASK_ERROR_INFO_REPORT_INTERVAL = "topology.task.error.report.interval";
-    
-    public static Integer getTaskErrorReportInterval(Map conf) {
-        return JStormUtils.parseInt(conf.get(TASK_ERROR_INFO_REPORT_INTERVAL), 60);
-    }
-    
-    public static void setTaskErrorReportInterval(Map conf, Integer interval) {
-        conf.put(TASK_ERROR_INFO_REPORT_INTERVAL, interval);
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client-extension/src/main/java/com/alibaba/jstorm/client/WorkerAssignment.java
----------------------------------------------------------------------
diff --git a/jstorm-client-extension/src/main/java/com/alibaba/jstorm/client/WorkerAssignment.java b/jstorm-client-extension/src/main/java/com/alibaba/jstorm/client/WorkerAssignment.java
deleted file mode 100644
index 9eac326..0000000
--- a/jstorm-client-extension/src/main/java/com/alibaba/jstorm/client/WorkerAssignment.java
+++ /dev/null
@@ -1,264 +0,0 @@
-package com.alibaba.jstorm.client;
-
-import java.io.Serializable;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Map.Entry;
-
-import org.apache.commons.lang.builder.ToStringBuilder;
-import org.apache.commons.lang.builder.ToStringStyle;
-import org.apache.log4j.Logger;
-import org.json.simple.JSONAware;
-
-import com.alibaba.jstorm.utils.JStormUtils;
-
-import backtype.storm.scheduler.WorkerSlot;
-import backtype.storm.utils.Utils;
-
-
-public class WorkerAssignment extends WorkerSlot implements Serializable,
-		JSONAware {
-	private static final Logger LOG = Logger.getLogger(WorkerAssignment.class);
-
-
-	private static final long serialVersionUID = -3483047434535537861L;
-
-	private Map<String, Integer> componentToNum = new HashMap<String, Integer>();
-
-	private long mem;
-
-	private int cpu;
-
-	private String hostName;
-
-	private String jvm;
-	
-	private static final String COMPONENTTONUM_TAG = "componentToNum";
-	private static final String MEM_TAG = "mem";
-	private static final String CPU_TAG = "cpu";
-	private static final String HOSTNAME_TAG = "hostName";
-	private static final String JVM_TAG = "jvm";
-	private static final String NODEID_TAG = "nodeId";
-	private static final String PORT_TAG = "port";
-
-	public WorkerAssignment(String nodeId, Number port) {
-		super(nodeId, port);
-		// TODO Auto-generated constructor stub
-	}
-
-	public WorkerAssignment() {
-
-	}
-
-	public void addComponent(String compenentName, Integer num) {
-		componentToNum.put(compenentName, num);
-	}
-
-	public Map<String, Integer> getComponentToNum() {
-		return componentToNum;
-	}
-
-	public String getHostName() {
-		return hostName;
-	}
-
-	public void setHostName(String hostName) {
-		this.hostName = hostName;
-	}
-
-	public void setJvm(String jvm) {
-		this.jvm = jvm;
-	}
-	
-	public String getJvm() {
-		return jvm;
-	}
-
-	public long getMem() {
-		return mem;
-	}
-
-	public void setMem(long mem) {
-		this.mem = mem;
-	}
-
-	public int getCpu() {
-		return cpu;
-	}
-
-	public void setCpu(int cpu) {
-		this.cpu = cpu;
-	}
-
-	@Override
-	public String toJSONString() {
-//		StringBuilder sb = new StringBuilder();
-
-//		sb.append("[");
-//		sb.append("\"" + this.getNodeId() + "\"");
-//		sb.append(",");
-//		sb.append("\"" + this.hostName + "\"");
-//		sb.append(",");
-//		sb.append("\"" + String.valueOf(this.getPort()) + "\"");
-//		sb.append(",");
-//		sb.append("\"" + this.jvm + "\"");
-//		sb.append(",");
-//		sb.append("\"" + String.valueOf(this.mem) + "\"");
-//		sb.append(",");
-//		sb.append("\"" + String.valueOf(this.cpu) + "\"");
-//		sb.append(",");
-//		sb.append("{");
-//		for (Entry<String, Integer> entry : componentToNum.entrySet()) {
-//			sb.append("\"" + entry.getKey() + "\":");
-//			sb.append("\"" + String.valueOf(entry.getValue()) + "\"");
-//			sb.append(",");
-//		}
-//		sb.append("}");
-//		sb.append("]");
-		
-		
-		
-		Map<String, String> map = new HashMap<String, String>();
-
-		map.put(COMPONENTTONUM_TAG, Utils.to_json(componentToNum));
-		map.put(MEM_TAG, String.valueOf(mem));
-		map.put(CPU_TAG, String.valueOf(cpu));
-		map.put(HOSTNAME_TAG, hostName);
-		map.put(JVM_TAG, jvm);
-		map.put(NODEID_TAG, getNodeId());
-		map.put(PORT_TAG, String.valueOf(getPort()));
-	
-		
-		return Utils.to_json(map);
-	}
-
-	public static WorkerAssignment parseFromObj(Object obj) {
-		if (obj == null) {
-			return null;
-		}
-
-		if (obj instanceof Map == false) {
-			return null;
-		}
-
-		try {
-			Map<String, String> map = (Map<String, String>)obj;
-			
-			String supervisorId = map.get(NODEID_TAG);
-			String hostname = map.get(HOSTNAME_TAG);
-			Integer port = JStormUtils.parseInt(map.get(PORT_TAG));
-			String jvm = map.get(JVM_TAG);
-			Long mem = JStormUtils.parseLong(map.get(MEM_TAG));
-			Integer cpu = JStormUtils.parseInt(map.get(CPU_TAG));
-			Map<String, Object> componentToNum = (Map<String, Object>)Utils.from_json(map.get(COMPONENTTONUM_TAG));
-
-			WorkerAssignment ret = new WorkerAssignment(supervisorId, port);
-
-			
-			ret.hostName = hostname;
-			ret.setNodeId(supervisorId);
-			ret.setJvm(jvm);
-			if (port != null) {
-				ret.setPort(port);
-			}
-			if (mem != null) {
-				ret.setMem(mem);
-			}
-			if (cpu != null) {
-				ret.setCpu(cpu);
-			}
-			
-			for (Entry<String, Object> entry : componentToNum.entrySet()) {
-				ret.addComponent(entry.getKey(),
-						JStormUtils.parseInt(entry.getValue()));
-			}
-			return ret;
-		} catch (Exception e) {
-			LOG.error("Failed to convert to WorkerAssignment,  raw:" + obj, e);
-			return null;
-		}
-
-	}
-
-	public static String getStringFromJson(String text) {
-		return text.equals("null") ? null : text;
-	}
-
-	@Override
-	public String toString() {
-		return ToStringBuilder.reflectionToString(this,
-				ToStringStyle.SHORT_PREFIX_STYLE);
-	}
-
-	@Override
-	public int hashCode() {
-		final int prime = 31;
-		int result = super.hashCode();
-		result = prime * result
-				+ ((componentToNum == null) ? 0 : componentToNum.hashCode());
-		result = prime * result + cpu;
-		result = prime * result
-				+ ((hostName == null) ? 0 : hostName.hashCode());
-		result = prime * result + ((jvm == null) ? 0 : jvm.hashCode());
-		result = prime * result + (int) (mem ^ (mem >>> 32));
-		return result;
-	}
-
-	@Override
-	public boolean equals(Object obj) {
-		if (this == obj)
-			return true;
-		if (!super.equals(obj))
-			return false;
-		if (getClass() != obj.getClass())
-			return false;
-		WorkerAssignment other = (WorkerAssignment) obj;
-		if (componentToNum == null) {
-			if (other.componentToNum != null)
-				return false;
-		} else if (!componentToNum.equals(other.componentToNum))
-			return false;
-		if (cpu != other.cpu)
-			return false;
-		if (hostName == null) {
-			if (other.hostName != null)
-				return false;
-		} else if (!hostName.equals(other.hostName))
-			return false;
-		if (jvm == null) {
-			if (other.jvm != null)
-				return false;
-		} else if (!jvm.equals(other.jvm))
-			return false;
-		if (mem != other.mem)
-			return false;
-		return true;
-	}
-
-	public static void main(String[] args) {
-		WorkerAssignment input = new WorkerAssignment();
-
-		 input.setJvm("sb");
-		
-		 input.setCpu(1);
-		
-		 input.setMem(2);
-
-		 input.addComponent("2b", 2);
-
-		String outString = Utils.to_json(input);
-
-		System.out.println(input);
-		
-		//String outString = "[componentToNum={},mem=1610612736,cpu=1,hostName=mobilejstorm-60-1,jvm=<null>,nodeId=<null>,port=0]";
-
-		Object object = Utils.from_json(outString);
-		System.out.println(object);
-
-		System.out.println(parseFromObj(object));
-
-		System.out.print(input.equals(parseFromObj(object)));
-	}
-	
-
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client-extension/src/main/java/com/alibaba/jstorm/client/metric/MetricCallback.java
----------------------------------------------------------------------
diff --git a/jstorm-client-extension/src/main/java/com/alibaba/jstorm/client/metric/MetricCallback.java b/jstorm-client-extension/src/main/java/com/alibaba/jstorm/client/metric/MetricCallback.java
deleted file mode 100644
index 964913e..0000000
--- a/jstorm-client-extension/src/main/java/com/alibaba/jstorm/client/metric/MetricCallback.java
+++ /dev/null
@@ -1,7 +0,0 @@
-package com.alibaba.jstorm.client.metric;
-
-import com.codahale.metrics.Metric;
-
-public interface MetricCallback<T extends Metric> {
-	void callback(T metric);
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client-extension/src/main/java/com/alibaba/jstorm/client/metric/MetricClient.java
----------------------------------------------------------------------
diff --git a/jstorm-client-extension/src/main/java/com/alibaba/jstorm/client/metric/MetricClient.java b/jstorm-client-extension/src/main/java/com/alibaba/jstorm/client/metric/MetricClient.java
deleted file mode 100644
index becc365..0000000
--- a/jstorm-client-extension/src/main/java/com/alibaba/jstorm/client/metric/MetricClient.java
+++ /dev/null
@@ -1,66 +0,0 @@
-package com.alibaba.jstorm.client.metric;
-
-import backtype.storm.task.TopologyContext;
-
-import com.alibaba.jstorm.metric.Metrics;
-import com.codahale.metrics.Counter;
-import com.codahale.metrics.Gauge;
-import com.codahale.metrics.Histogram;
-import com.codahale.metrics.Meter;
-import com.codahale.metrics.Timer;
-import com.alibaba.jstorm.metric.JStormTimer;
-import com.alibaba.jstorm.metric.JStormHistogram;
-
-public class MetricClient {
-
-	private final int taskid;
-
-	public MetricClient(TopologyContext context) {
-		taskid = context.getThisTaskId();
-	}
-	
-	private String getMetricName(Integer taskid, String name) {
-		return "task-" + String.valueOf(taskid) + ":" + name;
-	}
-
-	public Gauge<?> registerGauge(String name, Gauge<?> gauge, MetricCallback<Gauge<?>> callback) {
-		String userMetricName = getMetricName(taskid, name);
-        Gauge<?> ret = Metrics.registerGauge(userMetricName, gauge);
-        Metrics.registerUserDefine(userMetricName, gauge, callback);
-		return ret;
-	}
-
-	public Counter registerCounter(String name, MetricCallback<Counter> callback) {
-		String userMetricName = getMetricName(taskid, name);
-        Counter ret = Metrics.registerCounter(userMetricName);
-        Metrics.registerUserDefine(userMetricName, ret, callback);
-		return ret;
-	}
-	
-	public Meter registerMeter(String name, MetricCallback<Meter> callback) {
-		String userMetricName = getMetricName(taskid, name);
-		Meter ret = Metrics.registerMeter(userMetricName);
-		Metrics.registerUserDefine(userMetricName, ret, callback);
-		return ret;
-	}
-	
-	public JStormTimer registerTimer(String name, MetricCallback<Timer> callback) {
-		String userMetricName = getMetricName(taskid, name);
-        JStormTimer ret = Metrics.registerTimer(userMetricName);
-        Metrics.registerUserDefine(userMetricName, ret, callback);		
-		return ret;
-	}
-	
-	public JStormHistogram registerHistogram(String name, MetricCallback<Histogram> callback) {
-		String userMetricName = getMetricName(taskid, name);
-        JStormHistogram ret = Metrics.registerHistograms(userMetricName);
-        Metrics.registerUserDefine(userMetricName, ret, callback);		
-		return ret;
-	}
-	
-	public boolean unregister(String name, Integer taskid) {
-		String userMetricName = getMetricName(taskid, name);
-		return Metrics.unregisterUserDefine(userMetricName);
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client-extension/src/main/java/com/alibaba/jstorm/client/spout/IAckValueSpout.java
----------------------------------------------------------------------
diff --git a/jstorm-client-extension/src/main/java/com/alibaba/jstorm/client/spout/IAckValueSpout.java b/jstorm-client-extension/src/main/java/com/alibaba/jstorm/client/spout/IAckValueSpout.java
deleted file mode 100644
index f140098..0000000
--- a/jstorm-client-extension/src/main/java/com/alibaba/jstorm/client/spout/IAckValueSpout.java
+++ /dev/null
@@ -1,15 +0,0 @@
-package com.alibaba.jstorm.client.spout;
-
-import java.util.List;
-
-/**
- * This interface will list emit values when tuple success
- * 
- * if spout implement this interface, 
- * spout won't call ISpout.ack() when tuple success
- * 
- * @author longda
- */
-public interface IAckValueSpout {
-	void ack(Object msgId, List<Object> values);
-}