You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by pt...@apache.org on 2015/11/05 21:41:27 UTC
[48/60] [abbrv] [partial] storm git commit: Release 2.0.4-SNAPSHOT
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/history_cn.md
----------------------------------------------------------------------
diff --git a/history_cn.md b/history_cn.md
index 1b90dd1..e57e9bb 100644
--- a/history_cn.md
+++ b/history_cn.md
@@ -1,5 +1,82 @@
[JStorm English introduction](http://42.121.19.155/jstorm/JStorm-introduce-en.pptx)
[JStorm Chinese introduction](http://42.121.19.155/jstorm/JStorm-introduce.pptx)
+#Release 2.0.4-SNAPSHOT
+## New features
+1.完全重构采样系统, 使用全新的Rollingwindow和Metric计算方式,尤其是netty采样数据,另外metric 发送和接收将不通过zk
+2.完全重构web-ui
+3.引入rocketdb,增加nimbus cache layer
+4.梳理所有的zk节点和zk操作, 去掉无用的zk 操作
+5.梳理所有的thrift 数据结构和函数, 去掉无用的rpc函数
+6.将jstorm-client/jstorm-client-extension/jstorm-core整合为jstorm-core
+7.同步依赖和storm一样
+8.同步apache-storm-0.10.0-beta1 java 代码
+9.切换日志系统到logback
+10.升级thrift 到apache thrift 0.9.2
+11. 针对超大型任务600个worker/2000个task以上任务进行优化
+12. 要求 jdk7 or higher
+
+#Release 0.9.7.1
+## New features
+1. 增加Tuple自动batch的支持,以提高TPS以及降低消息处理延迟(task.batch.tuple=true,task.msg.batch.size=4)
+2. localFirst在本地节点处理能力跟不上时,自动对外部节点进行扩容
+3. 任务运行时,支持对任务配置的动态更新
+4. 支持任务对task心跳和task cleanup超时时间的自定义设置
+5. 增加disruptor queue对非阻塞模式TimeoutBlockingWaitStrategy的支持
+6. 增加Netty层消息发送超时时间设置的支持,以及Netty Client配置的优化
+7. 更新Tuple消息处理架构。去除不必要的总接收和总发送队列,减少消息流动环节,提高性能以及降低jstorm自身的cpu消耗。
+8. 增加客户端"--include-jars", 提交任务时,可以依赖额外的jar
+9. 启动nimbus/supervisor时, 如果取得的是127.0.0.0地址时, 拒绝启动
+10. 增加自定义样例
+11. 合并supervisor 的zk同步线程syncSupervisor和worker同步线程syncProcess
+## 配置变更
+1. 默认超时心跳时间设置为4分钟
+2. 修改netty 线程池clientScheduleService大小为5
+## Bug fix
+1. 优化gc参数,4g以下内存的worker默认4个gc线程,4g以上内存, 按内存大小/1g * 1.5原则设置gc线程数量
+2. Fix在bolt处理速度慢时,可能出现的task心跳更新不及时的bug
+3. Fix在一些情况下,netty连接重连时的异常等待bug
+4. 提交任务时, 避免重复创建thrift client
+5. Fix 启动worker失败时,重复下载binary问题
+##运维和脚本
+1. 优化cleandisk.sh脚本, 防止把当前目录删除和/tmp/hsperfdata_admin/
+2. 增加example下脚本执行权限
+3. 添加参数supervisor.host.start: true/false,可以通过脚本start.sh批量控制启动supervisor或不启动supervisor,默认是启动supervisor
+
+#Release 0.9.7
+## New features
+1. 实现topology任务并发动态调整的功能。在任务不下线的情况下,可以动态的对worker,spout, bolt或者ack进行扩容或缩容。rebalance命令被扩展用于支持动态扩容/缩容功能。
+2. 当打开资源隔离时,增加worker对cpu核使用上限的控制
+3. 调整task心跳更新机制。保证能正确反映spout/bolt exectue主线程的状态。
+4. 对worker和task的日志,增加jstorm信息前缀(clusterName, topologyName, ip:port, componentName, taskId, taskIndex)的支持
+5. 对topology任务调度时,增加对supervisor心跳状态的检查,不往无响应的supervisor调度任务
+6. 增加metric查询API,如: task的队列负载情况,worker的cpu,memory使用情况
+7. 增加supervisor上对任务jar包下载的重试,让worker不会因为jar在下载过程中的损坏,而启动失败
+8. 增加ZK Cache功能, 加快zk 读取速度, 并对部分节点采取直读方式
+9. 增加thrift getVersion api, 当客户端和服务器端版本不一致是,报warning
+10. 增加supervisor 心跳检查, 会拒绝分配任务到supervisor心跳超时的supervisor
+11. 更新发送到Alimonitor的user defined metrics 数据结构
+12. 增加客户端exclude-jar 功能, 当客户端提交任务时,可以通过exclude-jar和classloader来解决jar冲突问题。
+## 配置变更
+1. 修改supervisor到nimbus的心跳 超时时间到180秒
+2. 为避免内存outofmemory, 设置storm.messaging.netty.max.pending默认值为4
+3. 设置Nimbus 内存至4G
+4. 调大队列大小 task 队列大小为1024, 总发送队列和总接收队列为2048
+## Bug fix
+1. 短时间能多次restart worker配置多的任务时,由于Nimbus thrift thread的OOM导致,Supervisor可能出现假死的情况
+2. 同时提交任务,后续的任务可能会失败
+3. tickTuple不需要ack,更正对于tickTuple不正确的failed消息统计
+4. 解决use.old.assignment=true时,默认调度可能出现错误
+5. 解决删除topology zk 清理不干净问题
+6. 解决当做任务分配时, restart topology失败问题
+7. 解决同时提交多个topology 竞争问题
+8. 解决NPE 当注册metrics
+9. 解决 zkTool 读取 monitor的 znode 失败问题
+10.解决 本地模式和打开classloader模式下, 出现异常问题
+11.解决使用自定义日志logback时, 本地模式下,打印双份日志问题
+## 运维& 脚本
+1. Add rpm build spec
+2. Add deploy files of jstorm for rpm package building
+3. cronjob改成每小时运行一次, 并且coredump 改成保留1个小时
#Release 0.9.6.3
## New features
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client-extension/pom.xml
----------------------------------------------------------------------
diff --git a/jstorm-client-extension/pom.xml b/jstorm-client-extension/pom.xml
deleted file mode 100644
index 40650cd..0000000
--- a/jstorm-client-extension/pom.xml
+++ /dev/null
@@ -1,85 +0,0 @@
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
-
-
- <parent>
- <groupId>com.alibaba.jstorm</groupId>
- <artifactId>jstorm-all</artifactId>
- <version>0.9.6.3</version>
- <relativePath>..</relativePath>
- </parent>
- <!-- <parent>
- <groupId>com.taobao</groupId>
- <artifactId>parent</artifactId>
- <version>1.0.2</version>
- </parent> -->
- <modelVersion>4.0.0</modelVersion>
- <groupId>com.alibaba.jstorm</groupId>
- <artifactId>jstorm-client-extension</artifactId>
- <version>0.9.6.3</version>
- <packaging>jar</packaging>
- <name>${project.artifactId}-${project.version}</name>
-
- <build>
- <plugins>
- <plugin>
- <artifactId>maven-compiler-plugin</artifactId>
- <version>2.3.2</version>
- <configuration>
- <source>1.6</source>
- <target>1.6</target>
- </configuration>
- </plugin>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-source-plugin</artifactId>
- <executions>
- <execution>
- <id>attach-sources</id>
- <goals>
- <goal>jar</goal>
- </goals>
- </execution>
- </executions>
- </plugin>
- </plugins>
- </build>
- <properties>
- <powermock.version>1.4.11</powermock.version>
- <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
- </properties>
- <dependencies>
- <dependency>
- <groupId>com.alibaba.jstorm</groupId>
- <artifactId>jstorm-client</artifactId>
- <version>${project.version}</version>
- <scope>provided</scope>
- </dependency>
- <dependency>
- <groupId>com.alibaba</groupId>
- <artifactId>fastjson</artifactId>
- <version>1.1.41</version>
- </dependency>
- <dependency>
- <groupId>com.sun.net.httpserver</groupId>
- <artifactId>http</artifactId>
- <version>20070405</version>
- </dependency>
- <dependency>
- <groupId>org.powermock</groupId>
- <artifactId>powermock-module-junit4</artifactId>
- <version>${powermock.version}</version>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>com.codahale.metrics</groupId>
- <artifactId>metrics-core</artifactId>
- <version>3.0.1</version>
- </dependency>
- <dependency>
- <groupId>com.codahale.metrics</groupId>
- <artifactId>metrics-jvm</artifactId>
- <version>3.0.1</version>
- </dependency>
- </dependencies>
-</project>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client-extension/src/main/java/com/alibaba/jstorm/batch/BatchId.java
----------------------------------------------------------------------
diff --git a/jstorm-client-extension/src/main/java/com/alibaba/jstorm/batch/BatchId.java b/jstorm-client-extension/src/main/java/com/alibaba/jstorm/batch/BatchId.java
deleted file mode 100644
index e147db0..0000000
--- a/jstorm-client-extension/src/main/java/com/alibaba/jstorm/batch/BatchId.java
+++ /dev/null
@@ -1,66 +0,0 @@
-package com.alibaba.jstorm.batch;
-
-import java.io.Serializable;
-import java.util.concurrent.atomic.AtomicLong;
-
-import org.apache.commons.lang.builder.ToStringBuilder;
-import org.apache.commons.lang.builder.ToStringStyle;
-
-public class BatchId implements Serializable {
- private static final long serialVersionUID = 5720810158625748049L;
- protected final long id;
-
- protected BatchId(long id) {
- this.id = id;
- }
-
- public long getId() {
- return id;
- }
-
- @Override
- public int hashCode() {
- final int prime = 31;
- int result = 1;
- result = prime * result + (int) (id ^ (id >>> 32));
- return result;
- }
-
- @Override
- public boolean equals(Object obj) {
- if (this == obj)
- return true;
- if (obj == null)
- return false;
- if (getClass() != obj.getClass())
- return false;
- BatchId other = (BatchId) obj;
- if (id != other.id)
- return false;
- return true;
- }
-
- @Override
- public String toString() {
- return ToStringBuilder.reflectionToString(this,
- ToStringStyle.SHORT_PREFIX_STYLE);
- }
-
- private static AtomicLong staticId = new AtomicLong(0);
-
- public static void updateId(long id) {
- staticId.set(id);
- }
-
- public static BatchId mkInstance() {
- long id = staticId.incrementAndGet();
-
- return new BatchId(id);
- }
-
- public static BatchId incBatchId(BatchId old) {
- long other = old.getId();
- return new BatchId(other + 1);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client-extension/src/main/java/com/alibaba/jstorm/batch/BatchTopologyBuilder.java
----------------------------------------------------------------------
diff --git a/jstorm-client-extension/src/main/java/com/alibaba/jstorm/batch/BatchTopologyBuilder.java b/jstorm-client-extension/src/main/java/com/alibaba/jstorm/batch/BatchTopologyBuilder.java
deleted file mode 100644
index dc599f1..0000000
--- a/jstorm-client-extension/src/main/java/com/alibaba/jstorm/batch/BatchTopologyBuilder.java
+++ /dev/null
@@ -1,69 +0,0 @@
-package com.alibaba.jstorm.batch;
-
-import org.apache.log4j.Logger;
-
-import backtype.storm.topology.BoltDeclarer;
-import backtype.storm.topology.IBasicBolt;
-import backtype.storm.topology.SpoutDeclarer;
-import backtype.storm.topology.TopologyBuilder;
-
-import com.alibaba.jstorm.batch.impl.BatchSpoutTrigger;
-import com.alibaba.jstorm.batch.impl.CoordinatedBolt;
-import com.alibaba.jstorm.batch.util.BatchDef;
-
-public class BatchTopologyBuilder {
- private static final Logger LOG = Logger
- .getLogger(BatchTopologyBuilder.class);
-
- private TopologyBuilder topologyBuilder;
-
- private SpoutDeclarer spoutDeclarer;
-
- public BatchTopologyBuilder(String topologyName) {
- topologyBuilder = new TopologyBuilder();
-
- spoutDeclarer = topologyBuilder.setSpout(BatchDef.SPOUT_TRIGGER,
- new BatchSpoutTrigger(), 1);
- }
-
- public BoltDeclarer setSpout(String id, IBatchSpout spout, int paralel) {
-
- BoltDeclarer boltDeclarer = this
- .setBolt(id, (IBatchSpout) spout, paralel);
- boltDeclarer.allGrouping(BatchDef.SPOUT_TRIGGER,
- BatchDef.COMPUTING_STREAM_ID);
-
- return boltDeclarer;
- }
-
- public BoltDeclarer setBolt(String id, IBasicBolt bolt, int paralel) {
- CoordinatedBolt coordinatedBolt = new CoordinatedBolt(bolt);
-
- BoltDeclarer boltDeclarer = topologyBuilder.setBolt(id,
- coordinatedBolt, paralel);
-
- if (bolt instanceof IPrepareCommit) {
- boltDeclarer.allGrouping(BatchDef.SPOUT_TRIGGER,
- BatchDef.PREPARE_STREAM_ID);
- }
-
- if (bolt instanceof ICommitter) {
- boltDeclarer.allGrouping(BatchDef.SPOUT_TRIGGER,
- BatchDef.COMMIT_STREAM_ID);
- boltDeclarer.allGrouping(BatchDef.SPOUT_TRIGGER,
- BatchDef.REVERT_STREAM_ID);
- }
-
- if (bolt instanceof IPostCommit) {
- boltDeclarer.allGrouping(BatchDef.SPOUT_TRIGGER,
- BatchDef.POST_STREAM_ID);
- }
-
- return boltDeclarer;
- }
-
- public TopologyBuilder getTopologyBuilder() {
- return topologyBuilder;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client-extension/src/main/java/com/alibaba/jstorm/batch/IBatchSpout.java
----------------------------------------------------------------------
diff --git a/jstorm-client-extension/src/main/java/com/alibaba/jstorm/batch/IBatchSpout.java b/jstorm-client-extension/src/main/java/com/alibaba/jstorm/batch/IBatchSpout.java
deleted file mode 100644
index d3d1178..0000000
--- a/jstorm-client-extension/src/main/java/com/alibaba/jstorm/batch/IBatchSpout.java
+++ /dev/null
@@ -1,37 +0,0 @@
-package com.alibaba.jstorm.batch;
-
-import java.io.Serializable;
-
-import backtype.storm.topology.IBasicBolt;
-
-public interface IBatchSpout extends IBasicBolt, ICommitter, Serializable {
-
- /**
- * input's filed 0 is BatchId
- *
- * execute only receive trigger message
- *
- * do emitBatch operation in execute whose streamID is "batch/compute-stream"
- */
- //void execute(Tuple input, IBasicOutputCollector collector);
- /**
- * begin to ack batchId's data
- *
- * return value will be stored in ZK, so sometimes don't need special action
- *
- * @param id
- */
- //byte[] commit(BatchId id) throws FailedException;
-
- /**
- * begin to revert batchId's data
- *
- * If current task fails to commit batchId, it won't call revert(batchId)
- * If current task fails to revert batchId, JStorm won't call revert again.
- *
- * if not transaction, it can don't care revert
- *
- * @param id
- */
- //void revert(BatchId id, byte[] commitResult);
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client-extension/src/main/java/com/alibaba/jstorm/batch/ICommitter.java
----------------------------------------------------------------------
diff --git a/jstorm-client-extension/src/main/java/com/alibaba/jstorm/batch/ICommitter.java b/jstorm-client-extension/src/main/java/com/alibaba/jstorm/batch/ICommitter.java
deleted file mode 100644
index 9492398..0000000
--- a/jstorm-client-extension/src/main/java/com/alibaba/jstorm/batch/ICommitter.java
+++ /dev/null
@@ -1,36 +0,0 @@
-package com.alibaba.jstorm.batch;
-
-import java.io.Serializable;
-
-import backtype.storm.topology.FailedException;
-
-/**
- * The less committer, the state is more stable.
- * Don't need to do
- *
- * @author zhongyan.feng
- * @version
- */
-public interface ICommitter extends Serializable{
- /**
- * begin to commit batchId's data, then return the commit result
- * The commitResult will store into outside storage
- *
- * if failed to commit, please throw FailedException
- *
- *
- *
- * @param id
- */
- byte[] commit(BatchId id) throws FailedException;
-
- /**
- * begin to revert batchId's data
- *
- * If current task fails to commit batchId, it won't call revert(batchId)
- * If current task fails to revert batchId, JStorm won't call revert again.
- *
- * @param id
- */
- void revert(BatchId id, byte[] commitResult);
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client-extension/src/main/java/com/alibaba/jstorm/batch/IPostCommit.java
----------------------------------------------------------------------
diff --git a/jstorm-client-extension/src/main/java/com/alibaba/jstorm/batch/IPostCommit.java b/jstorm-client-extension/src/main/java/com/alibaba/jstorm/batch/IPostCommit.java
deleted file mode 100644
index b408a3f..0000000
--- a/jstorm-client-extension/src/main/java/com/alibaba/jstorm/batch/IPostCommit.java
+++ /dev/null
@@ -1,14 +0,0 @@
-package com.alibaba.jstorm.batch;
-
-import backtype.storm.topology.BasicOutputCollector;
-
-
-public interface IPostCommit {
- /**
- * Do after commit
- * Don't care failure of postCommit
- *
- * @param id
- */
- void postCommit(BatchId id, BasicOutputCollector collector);
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client-extension/src/main/java/com/alibaba/jstorm/batch/IPrepareCommit.java
----------------------------------------------------------------------
diff --git a/jstorm-client-extension/src/main/java/com/alibaba/jstorm/batch/IPrepareCommit.java b/jstorm-client-extension/src/main/java/com/alibaba/jstorm/batch/IPrepareCommit.java
deleted file mode 100644
index dd3da44..0000000
--- a/jstorm-client-extension/src/main/java/com/alibaba/jstorm/batch/IPrepareCommit.java
+++ /dev/null
@@ -1,20 +0,0 @@
-package com.alibaba.jstorm.batch;
-
-import backtype.storm.topology.BasicOutputCollector;
-import backtype.storm.topology.FailedException;
-
-/**
- * Called before commit, after finish batch
- *
- * @author zhongyan.feng
- */
-public interface IPrepareCommit {
-
- /**
- * Do prepare before commit
- *
- * @param id
- * @param collector
- */
- void prepareCommit(BatchId id, BasicOutputCollector collector) throws FailedException;
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client-extension/src/main/java/com/alibaba/jstorm/batch/impl/BatchSpoutMsgId.java
----------------------------------------------------------------------
diff --git a/jstorm-client-extension/src/main/java/com/alibaba/jstorm/batch/impl/BatchSpoutMsgId.java b/jstorm-client-extension/src/main/java/com/alibaba/jstorm/batch/impl/BatchSpoutMsgId.java
deleted file mode 100644
index 63704fb..0000000
--- a/jstorm-client-extension/src/main/java/com/alibaba/jstorm/batch/impl/BatchSpoutMsgId.java
+++ /dev/null
@@ -1,49 +0,0 @@
-package com.alibaba.jstorm.batch.impl;
-
-import java.io.Serializable;
-
-import org.apache.commons.lang.builder.ToStringBuilder;
-import org.apache.commons.lang.builder.ToStringStyle;
-
-import com.alibaba.jstorm.batch.BatchId;
-import com.alibaba.jstorm.batch.util.BatchStatus;
-
-public class BatchSpoutMsgId implements Serializable{
-
- /** */
- private static final long serialVersionUID = 2899009971479957517L;
-
- private final BatchId batchId;
- private BatchStatus batchStatus;
-
- protected BatchSpoutMsgId(BatchId batchId, BatchStatus batchStatus) {
- this.batchId = batchId;
- this.batchStatus = batchStatus;
- }
-
- public static BatchSpoutMsgId mkInstance() {
- BatchId batchId = BatchId.mkInstance();
- BatchStatus batchStatus = BatchStatus.COMPUTING;
-
- return new BatchSpoutMsgId(batchId, batchStatus);
- }
-
-
- public BatchStatus getBatchStatus() {
- return batchStatus;
- }
-
- public void setBatchStatus(BatchStatus batchStatus) {
- this.batchStatus = batchStatus;
- }
-
- public BatchId getBatchId() {
- return batchId;
- }
-
- @Override
- public String toString() {
- return ToStringBuilder.reflectionToString(this,
- ToStringStyle.SHORT_PREFIX_STYLE);
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client-extension/src/main/java/com/alibaba/jstorm/batch/impl/BatchSpoutTrigger.java
----------------------------------------------------------------------
diff --git a/jstorm-client-extension/src/main/java/com/alibaba/jstorm/batch/impl/BatchSpoutTrigger.java b/jstorm-client-extension/src/main/java/com/alibaba/jstorm/batch/impl/BatchSpoutTrigger.java
deleted file mode 100644
index d0c3e94..0000000
--- a/jstorm-client-extension/src/main/java/com/alibaba/jstorm/batch/impl/BatchSpoutTrigger.java
+++ /dev/null
@@ -1,312 +0,0 @@
-package com.alibaba.jstorm.batch.impl;
-
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.log4j.Logger;
-
-import backtype.storm.Config;
-import backtype.storm.spout.SpoutOutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.IRichSpout;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.tuple.Fields;
-import backtype.storm.tuple.Values;
-
-import com.alibaba.jstorm.batch.BatchId;
-import com.alibaba.jstorm.batch.util.BatchCommon;
-import com.alibaba.jstorm.batch.util.BatchDef;
-import com.alibaba.jstorm.batch.util.BatchStatus;
-import com.alibaba.jstorm.client.ConfigExtension;
-import com.alibaba.jstorm.cluster.ClusterState;
-import com.alibaba.jstorm.utils.IntervalCheck;
-import com.alibaba.jstorm.utils.JStormUtils;
-
-/**
- * Strong Sequence
- *
- * @author zhongyan.feng
- * @version
- */
-public class BatchSpoutTrigger implements IRichSpout {
- /** */
- private static final long serialVersionUID = 7215109169247425954L;
-
- private static final Logger LOG = Logger.getLogger(BatchSpoutTrigger.class);
-
- private LinkedBlockingQueue<BatchSpoutMsgId> batchQueue;
-
- private transient ClusterState zkClient;
-
- private transient SpoutOutputCollector collector;
-
- private static final String ZK_NODE_PATH = "/trigger";
-
- private static BatchId currentBatchId = null;
-
- private Map conf;
-
- private String taskName;
-
- private IntervalCheck intervalCheck;
-
- /**
- * @throws Exception
- *
- */
- public void initMsgId() throws Exception {
- Long zkMsgId = null;
- byte[] data = zkClient.get_data(ZK_NODE_PATH, false);
- if (data != null) {
- String value = new String(data);
- try {
- zkMsgId = Long.valueOf(value);
- LOG.info("ZK msgId:" + zkMsgId);
- } catch (Exception e) {
- LOG.warn("Failed to get msgId ", e);
-
- }
-
- }
-
- if (zkMsgId != null) {
- BatchId.updateId(zkMsgId);
- }
-
- int max_spout_pending = JStormUtils.parseInt(
- conf.get(Config.TOPOLOGY_MAX_SPOUT_PENDING), 1);
-
- for (int i = 0; i < max_spout_pending; i++) {
- BatchSpoutMsgId msgId = BatchSpoutMsgId.mkInstance();
- if (currentBatchId == null) {
- currentBatchId = msgId.getBatchId();
- }
- batchQueue.offer(msgId);
- LOG.info("Push into queue," + msgId);
- }
-
- }
-
- @Override
- public void open(Map conf, TopologyContext context,
- SpoutOutputCollector collector) {
- batchQueue = new LinkedBlockingQueue<BatchSpoutMsgId>();
- this.collector = collector;
- this.conf = conf;
- taskName = context.getThisComponentId() + "_" + context.getThisTaskId();
-
- intervalCheck = new IntervalCheck();
-
- try {
- zkClient = BatchCommon.getZkClient(conf);
-
- initMsgId();
-
- } catch (Exception e) {
- LOG.error("", e);
- throw new RuntimeException("Failed to init");
- }
- LOG.info("Successfully open " + taskName);
- }
-
- @Override
- public void close() {
- }
-
- @Override
- public void activate() {
- LOG.info("Activate " + taskName);
- }
-
- @Override
- public void deactivate() {
- LOG.info("Deactivate " + taskName);
- }
-
- protected String getStreamId(BatchStatus batchStatus) {
- if (batchStatus == BatchStatus.COMPUTING) {
- return BatchDef.COMPUTING_STREAM_ID;
- } else if (batchStatus == BatchStatus.PREPARE_COMMIT) {
- return BatchDef.PREPARE_STREAM_ID;
- } else if (batchStatus == BatchStatus.COMMIT) {
- return BatchDef.COMMIT_STREAM_ID;
- } else if (batchStatus == BatchStatus.POST_COMMIT) {
- return BatchDef.POST_STREAM_ID;
- } else if (batchStatus == BatchStatus.REVERT_COMMIT) {
- return BatchDef.REVERT_STREAM_ID;
- } else {
- LOG.error("Occur unkonw type BatchStatus " + batchStatus);
- throw new RuntimeException();
- }
- }
-
- protected boolean isCommitStatus(BatchStatus batchStatus) {
- if (batchStatus == BatchStatus.COMMIT) {
- return true;
- } else if (batchStatus == BatchStatus.REVERT_COMMIT) {
- return true;
- } else {
- return false;
- }
- }
-
- protected boolean isCommitWait(BatchSpoutMsgId msgId) {
-
- if (isCommitStatus(msgId.getBatchStatus()) == false) {
- return false;
- }
-
- // left status is commit status
- if (currentBatchId.getId() >= msgId.getBatchId().getId()) {
- return false;
- }
-
- return true;
- }
-
- @Override
- public void nextTuple() {
- BatchSpoutMsgId msgId = null;
- try {
- msgId = batchQueue.poll(10, TimeUnit.MILLISECONDS);
- } catch (InterruptedException e) {
- LOG.error("", e);
- }
- if (msgId == null) {
- return;
- }
-
- if (isCommitWait(msgId)) {
-
- batchQueue.offer(msgId);
- if (intervalCheck.check()) {
- LOG.info("Current msgId " + msgId
- + ", but current commit BatchId is " + currentBatchId);
- }else {
- LOG.debug("Current msgId " + msgId
- + ", but current commit BatchId is " + currentBatchId);
- }
-
- return;
- }
-
- String streamId = getStreamId(msgId.getBatchStatus());
- List<Integer> outTasks = collector.emit(streamId,
- new Values(msgId.getBatchId()), msgId);
- if (outTasks.isEmpty()) {
- forward(msgId);
- }
- return;
-
- }
-
- protected void mkMsgId(BatchSpoutMsgId oldMsgId) {
- synchronized (BatchSpoutMsgId.class) {
- if (currentBatchId.getId() <= oldMsgId.getBatchId().getId()) {
- // this is normal case
-
- byte[] data = String.valueOf(currentBatchId.getId()).getBytes();
- try {
- zkClient.set_data(ZK_NODE_PATH, data);
- } catch (Exception e) {
- LOG.error("Failed to update to ZK " + oldMsgId, e);
- }
-
- currentBatchId = BatchId.incBatchId(oldMsgId.getBatchId());
-
- } else {
- // bigger batchId has been failed, when old msgId finish
- // it will go here
-
- }
-
- }
-
- BatchSpoutMsgId newMsgId = BatchSpoutMsgId.mkInstance();
- batchQueue.offer(newMsgId);
- StringBuilder sb = new StringBuilder();
- sb.append("Create new BatchId,");
- sb.append("old:").append(oldMsgId);
- sb.append("new:").append(newMsgId);
- sb.append("currentBatchId:").append(currentBatchId);
- LOG.info(sb.toString());
- }
-
- protected void forward(BatchSpoutMsgId msgId) {
- BatchStatus status = msgId.getBatchStatus();
-
- BatchStatus newStatus = status.forward();
- if (newStatus == null) {
- // create new status
- mkMsgId(msgId);
- LOG.info("Finish old batch " + msgId);
-
- } else {
- msgId.setBatchStatus(newStatus);
- batchQueue.offer(msgId);
- LOG.info("Forward batch " + msgId);
- }
- }
-
- @Override
- public void ack(Object msgId) {
- if (msgId instanceof BatchSpoutMsgId) {
- forward((BatchSpoutMsgId) msgId);
- return;
- } else {
- LOG.warn("Unknown type msgId " + msgId.getClass().getName() + ":"
- + msgId);
- return;
- }
- }
-
- protected void handleFail(BatchSpoutMsgId msgId) {
- LOG.info("Failed batch " + msgId);
- BatchStatus status = msgId.getBatchStatus();
-
- BatchStatus newStatus = status.error();
- if (newStatus == BatchStatus.ERROR) {
- // create new status
- mkMsgId(msgId);
-
- } else {
-
- msgId.setBatchStatus(newStatus);
- batchQueue.offer(msgId);
-
- }
- }
-
- @Override
- public void fail(Object msgId) {
- if (msgId instanceof BatchSpoutMsgId) {
- handleFail((BatchSpoutMsgId) msgId);
- } else {
- LOG.warn("Unknown type msgId " + msgId.getClass().getName() + ":"
- + msgId);
- return;
- }
- }
-
- @Override
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
- declarer.declareStream(BatchDef.COMPUTING_STREAM_ID, new Fields(
- "BatchId"));
- declarer.declareStream(BatchDef.PREPARE_STREAM_ID,
- new Fields("BatchId"));
- declarer.declareStream(BatchDef.COMMIT_STREAM_ID, new Fields("BatchId"));
- declarer.declareStream(BatchDef.REVERT_STREAM_ID, new Fields("BatchId"));
- declarer.declareStream(BatchDef.POST_STREAM_ID, new Fields("BatchId"));
- }
-
- @Override
- public Map<String, Object> getComponentConfiguration() {
- Map<String, Object> map = new HashMap<String, Object>();
- ConfigExtension.setSpoutSingleThread(map, true);
- return map;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client-extension/src/main/java/com/alibaba/jstorm/batch/impl/CoordinatedBolt.java
----------------------------------------------------------------------
diff --git a/jstorm-client-extension/src/main/java/com/alibaba/jstorm/batch/impl/CoordinatedBolt.java b/jstorm-client-extension/src/main/java/com/alibaba/jstorm/batch/impl/CoordinatedBolt.java
deleted file mode 100644
index 0f4720b..0000000
--- a/jstorm-client-extension/src/main/java/com/alibaba/jstorm/batch/impl/CoordinatedBolt.java
+++ /dev/null
@@ -1,281 +0,0 @@
-package com.alibaba.jstorm.batch.impl;
-
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.List;
-import java.util.Map;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import backtype.storm.task.OutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.BasicOutputCollector;
-import backtype.storm.topology.FailedException;
-import backtype.storm.topology.IBasicBolt;
-import backtype.storm.topology.IRichBolt;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.topology.ReportedFailedException;
-import backtype.storm.tuple.Tuple;
-import backtype.storm.utils.TimeCacheMap;
-
-import com.alibaba.jstorm.batch.BatchId;
-import com.alibaba.jstorm.batch.ICommitter;
-import com.alibaba.jstorm.batch.IPostCommit;
-import com.alibaba.jstorm.batch.IPrepareCommit;
-import com.alibaba.jstorm.batch.util.BatchCommon;
-import com.alibaba.jstorm.batch.util.BatchDef;
-import com.alibaba.jstorm.batch.util.BatchStatus;
-import com.alibaba.jstorm.cluster.ClusterState;
-
-public class CoordinatedBolt implements IRichBolt {
- private static final long serialVersionUID = 5720810158625748046L;
-
- public static Logger LOG = LoggerFactory.getLogger(CoordinatedBolt.class);
-
- private IBasicBolt delegate;
- private BasicOutputCollector basicCollector;
- private OutputCollector collector;
-
- private String taskId;
- private String taskName;
-
- private boolean isCommiter = false;
- private String zkCommitPath;
- private TimeCacheMap<Object, Object> commited;
-
- public CoordinatedBolt(IBasicBolt delegate) {
-
- this.delegate = delegate;
-
- }
-
- // use static variable to reduce zk connection
- private static ClusterState zkClient = null;
-
- public void mkCommitDir(Map conf) {
-
- try {
- zkClient = BatchCommon.getZkClient(conf);
-
- zkCommitPath = BatchDef.ZK_COMMIT_DIR + BatchDef.ZK_SEPERATOR
- + taskId;
- if (zkClient.node_existed(zkCommitPath, false)) {
- zkClient.delete_node(zkCommitPath);
- }
- zkClient.mkdirs(zkCommitPath);
-
- LOG.info(taskName + " successfully create commit path" + zkCommitPath);
- } catch (Exception e) {
- LOG.error("Failed to create zk node", e);
- throw new RuntimeException();
- }
- }
-
- public void prepare(Map conf, TopologyContext context,
- OutputCollector collector) {
-
- taskId = String.valueOf(context.getThisTaskId());
- taskName = context.getThisComponentId() + "_" + context.getThisTaskId();
-
- this.basicCollector = new BasicOutputCollector(collector);
- this.collector = collector;
-
- if (delegate instanceof ICommitter) {
- isCommiter = true;
- commited = new TimeCacheMap<Object, Object>(
- context.maxTopologyMessageTimeout());
- mkCommitDir(conf);
- }
-
- delegate.prepare(conf, context);
-
- }
-
- public void removeUseless(String path, int reserveSize) throws Exception {
- List<String> childs = zkClient.get_children(path, false);
- Collections.sort(childs, new Comparator<String>() {
-
- @Override
- public int compare(String o1, String o2) {
- try {
- Long v1 = Long.valueOf(o1);
- Long v2 = Long.valueOf(o2);
- return v1.compareTo(v2);
- }catch(Exception e) {
- return o1.compareTo(o2);
- }
-
- }
-
- });
-
- for (int index = 0; index < childs.size() - reserveSize; index++) {
- zkClient.delete_node(path + BatchDef.ZK_SEPERATOR
- + childs.get(index));
- }
- }
-
- public String getCommitPath(BatchId id) {
- return zkCommitPath + BatchDef.ZK_SEPERATOR + id.getId();
- }
-
- public void updateToZk(Object id, byte[] commitResult) {
- try {
-
- removeUseless(zkCommitPath, BatchDef.ZK_COMMIT_RESERVER_NUM);
-
- String path = getCommitPath((BatchId)id);
- byte[] data = commitResult;
- if (data == null) {
- data = new byte[0];
- }
- zkClient.set_data(path, data);
- LOG.info("Update " + path + " to zk");
- } catch (Exception e) {
- LOG.warn("Failed to update to zk,", e);
-
- }
-
- }
-
- public byte[] getCommittedData(Object id) {
- try {
- String path = getCommitPath((BatchId)id);
- byte[] data = zkClient.get_data(path, false);
-
- return data;
- } catch (Exception e) {
- LOG.error("Failed to visit ZK,", e);
- return null;
- }
- }
-
- public void handleRegular(Tuple tuple) {
- basicCollector.setContext(tuple);
- try {
- delegate.execute(tuple, basicCollector);
- collector.ack(tuple);
- } catch (FailedException e) {
- if (e instanceof ReportedFailedException) {
- collector.reportError(e);
- }
- collector.fail(tuple);
- }
-
- }
-
- public void handlePrepareCommit(Tuple tuple) {
- basicCollector.setContext(tuple);
- try {
- BatchId id = (BatchId) tuple.getValue(0);
- ((IPrepareCommit) delegate).prepareCommit(id, basicCollector);
- collector.ack(tuple);
- } catch (FailedException e) {
- if (e instanceof ReportedFailedException) {
- collector.reportError(e);
- }
- collector.fail(tuple);
- }
-
- }
-
- public void handleCommit(Tuple tuple) {
- Object id = tuple.getValue(0);
- try {
- byte[] commitResult = ((ICommitter) delegate).commit((BatchId) id);
-
- collector.ack(tuple);
-
- updateToZk(id, commitResult);
- commited.put(id, commitResult);
- } catch (Exception e) {
- LOG.error("Failed to commit ", e);
- collector.fail(tuple);
- }
- }
-
- public void handleRevert(Tuple tuple) {
- try {
- Object id = tuple.getValue(0);
- byte[] commitResult = null;
-
- if (commited.containsKey(id)) {
- commitResult = (byte[]) commited.get(id);
- } else {
- commitResult = getCommittedData(id);
- }
-
- if (commitResult != null) {
- ((ICommitter) delegate).revert((BatchId) id, commitResult);
- }
- } catch (Exception e) {
- LOG.error("Failed to revert,", e);
- }
-
- collector.ack(tuple);
- }
-
- public void handlePostCommit(Tuple tuple) {
-
- basicCollector.setContext(tuple);
- try {
- BatchId id = (BatchId) tuple.getValue(0);
- ((IPostCommit) delegate).postCommit(id, basicCollector);
-
- } catch (Exception e) {
- LOG.info("Failed to do postCommit,", e);
- }
- collector.ack(tuple);
- }
-
- public void execute(Tuple tuple) {
-
- BatchStatus batchStatus = getBatchStatus(tuple);
-
- if (batchStatus == BatchStatus.COMPUTING) {
- handleRegular(tuple);
- } else if (batchStatus == BatchStatus.PREPARE_COMMIT) {
- handlePrepareCommit(tuple);
- } else if (batchStatus == BatchStatus.COMMIT) {
- handleCommit(tuple);
- } else if (batchStatus == BatchStatus.REVERT_COMMIT) {
- handleRevert(tuple);
- } else if (batchStatus == BatchStatus.POST_COMMIT) {
- handlePostCommit(tuple);
- } else {
- throw new RuntimeException(
- "Receive commit tuple, but not committer");
- }
- }
-
- public void cleanup() {
- delegate.cleanup();
- }
-
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
- delegate.declareOutputFields(declarer);
- }
-
- @Override
- public Map<String, Object> getComponentConfiguration() {
- return delegate.getComponentConfiguration();
- }
-
- private BatchStatus getBatchStatus(Tuple tuple) {
- String streamId = tuple.getSourceStreamId();
-
- if (streamId.equals(BatchDef.PREPARE_STREAM_ID)) {
- return BatchStatus.PREPARE_COMMIT;
- } else if (streamId.equals(BatchDef.COMMIT_STREAM_ID)) {
- return BatchStatus.COMMIT;
- } else if (streamId.equals(BatchDef.REVERT_STREAM_ID)) {
- return BatchStatus.REVERT_COMMIT;
- } else if (streamId.equals(BatchDef.POST_STREAM_ID)) {
- return BatchStatus.POST_COMMIT;
- } else {
- return BatchStatus.COMPUTING;
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client-extension/src/main/java/com/alibaba/jstorm/batch/util/BatchCommon.java
----------------------------------------------------------------------
diff --git a/jstorm-client-extension/src/main/java/com/alibaba/jstorm/batch/util/BatchCommon.java b/jstorm-client-extension/src/main/java/com/alibaba/jstorm/batch/util/BatchCommon.java
deleted file mode 100644
index f99edfa..0000000
--- a/jstorm-client-extension/src/main/java/com/alibaba/jstorm/batch/util/BatchCommon.java
+++ /dev/null
@@ -1,65 +0,0 @@
-package com.alibaba.jstorm.batch.util;
-
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.log4j.Logger;
-
-import backtype.storm.Config;
-
-import com.alibaba.jstorm.cluster.ClusterState;
-import com.alibaba.jstorm.cluster.DistributedClusterState;
-import com.alibaba.jstorm.utils.JStormUtils;
-
-public class BatchCommon {
- private static final Logger LOG = Logger.getLogger(BatchCommon.class);
-
- private static ClusterState zkClient = null;
-
- public static ClusterState getZkClient(Map conf) throws Exception {
- synchronized (BatchCommon.class) {
- if (zkClient != null) {
- return zkClient;
- }
-
- List<String> zkServers = null;
- if (conf.get(Config.TRANSACTIONAL_ZOOKEEPER_SERVERS) != null) {
- zkServers = (List<String>) conf
- .get(Config.TRANSACTIONAL_ZOOKEEPER_SERVERS);
- } else if (conf.get(Config.STORM_ZOOKEEPER_SERVERS) != null) {
- zkServers = (List<String>) conf
- .get(Config.STORM_ZOOKEEPER_SERVERS);
- } else {
- throw new RuntimeException("No setting zk");
- }
-
- int port = 2181;
- if (conf.get(Config.TRANSACTIONAL_ZOOKEEPER_PORT) != null) {
- port = JStormUtils.parseInt(
- conf.get(Config.TRANSACTIONAL_ZOOKEEPER_PORT), 2181);
- } else if (conf.get(Config.STORM_ZOOKEEPER_PORT) != null) {
- port = JStormUtils.parseInt(
- conf.get(Config.STORM_ZOOKEEPER_PORT), 2181);
- }
-
- String root = BatchDef.BATCH_ZK_ROOT;
- if (conf.get(Config.TRANSACTIONAL_ZOOKEEPER_ROOT) != null) {
- root = (String) conf.get(Config.TRANSACTIONAL_ZOOKEEPER_ROOT);
- }
-
- root = root + BatchDef.ZK_SEPERATOR
- + conf.get(Config.TOPOLOGY_NAME);
-
- Map<Object, Object> tmpConf = new HashMap<Object, Object>();
- tmpConf.putAll(conf);
- tmpConf.put(Config.STORM_ZOOKEEPER_SERVERS, zkServers);
- tmpConf.put(Config.STORM_ZOOKEEPER_ROOT, root);
- zkClient = new DistributedClusterState(tmpConf);
-
- LOG.info("Successfully connect ZK");
- return zkClient;
- }
-
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client-extension/src/main/java/com/alibaba/jstorm/batch/util/BatchDef.java
----------------------------------------------------------------------
diff --git a/jstorm-client-extension/src/main/java/com/alibaba/jstorm/batch/util/BatchDef.java b/jstorm-client-extension/src/main/java/com/alibaba/jstorm/batch/util/BatchDef.java
deleted file mode 100644
index 7d87cfa..0000000
--- a/jstorm-client-extension/src/main/java/com/alibaba/jstorm/batch/util/BatchDef.java
+++ /dev/null
@@ -1,26 +0,0 @@
-package com.alibaba.jstorm.batch.util;
-
-
-public class BatchDef {
- public static final String COMPUTING_STREAM_ID = "batch/compute-stream";
-
- public static final String PREPARE_STREAM_ID = "batch/parepare-stream";
-
- public static final String COMMIT_STREAM_ID = "batch/commit-stream";
-
- public static final String REVERT_STREAM_ID = "batch/revert-stream";
-
- public static final String POST_STREAM_ID = "batch/post-stream";
-
- public static final String SPOUT_TRIGGER = "spout_trigger";
-
- public static final String BATCH_ZK_ROOT = "batch";
-
- public static final String ZK_COMMIT_DIR = "/commit";
-
- public static final int ZK_COMMIT_RESERVER_NUM = 3;
-
- public static final String ZK_SEPERATOR = "/";
-
-
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client-extension/src/main/java/com/alibaba/jstorm/batch/util/BatchStatus.java
----------------------------------------------------------------------
diff --git a/jstorm-client-extension/src/main/java/com/alibaba/jstorm/batch/util/BatchStatus.java b/jstorm-client-extension/src/main/java/com/alibaba/jstorm/batch/util/BatchStatus.java
deleted file mode 100644
index e02daeb..0000000
--- a/jstorm-client-extension/src/main/java/com/alibaba/jstorm/batch/util/BatchStatus.java
+++ /dev/null
@@ -1,33 +0,0 @@
-package com.alibaba.jstorm.batch.util;
-
-public enum BatchStatus {
- COMPUTING,
- PREPARE_COMMIT,
- COMMIT,
- REVERT_COMMIT,
- POST_COMMIT,
- ERROR;
-
-
-
- public BatchStatus forward() {
- if (this == COMPUTING) {
- return PREPARE_COMMIT;
- }else if (this == PREPARE_COMMIT) {
- return COMMIT;
- }else if (this == COMMIT) {
- return POST_COMMIT;
- }else {
- return null;
- }
- }
-
- public BatchStatus error() {
- if (this == COMMIT) {
- return REVERT_COMMIT;
- }else {
- return ERROR;
- }
- }
-
-};
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client-extension/src/main/java/com/alibaba/jstorm/callback/AsyncLoopDefaultKill.java
----------------------------------------------------------------------
diff --git a/jstorm-client-extension/src/main/java/com/alibaba/jstorm/callback/AsyncLoopDefaultKill.java b/jstorm-client-extension/src/main/java/com/alibaba/jstorm/callback/AsyncLoopDefaultKill.java
deleted file mode 100644
index f392f22..0000000
--- a/jstorm-client-extension/src/main/java/com/alibaba/jstorm/callback/AsyncLoopDefaultKill.java
+++ /dev/null
@@ -1,25 +0,0 @@
-package com.alibaba.jstorm.callback;
-
-import com.alibaba.jstorm.utils.JStormUtils;
-
-/**
- * Killer callback
- *
- * @author yannian
- *
- */
-
-public class AsyncLoopDefaultKill extends RunnableCallback {
-
- @Override
- public <T> Object execute(T... args) {
- Exception e = (Exception) args[0];
- JStormUtils.halt_process(1, "Async loop died!");
- return e;
- }
-
- @Override
- public void run() {
- JStormUtils.halt_process(1, "Async loop died!");
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client-extension/src/main/java/com/alibaba/jstorm/callback/AsyncLoopRunnable.java
----------------------------------------------------------------------
diff --git a/jstorm-client-extension/src/main/java/com/alibaba/jstorm/callback/AsyncLoopRunnable.java b/jstorm-client-extension/src/main/java/com/alibaba/jstorm/callback/AsyncLoopRunnable.java
deleted file mode 100644
index 1b1e588..0000000
--- a/jstorm-client-extension/src/main/java/com/alibaba/jstorm/callback/AsyncLoopRunnable.java
+++ /dev/null
@@ -1,82 +0,0 @@
-package com.alibaba.jstorm.callback;
-
-import org.apache.log4j.Logger;
-
-import com.alibaba.jstorm.utils.JStormUtils;
-
-/**
- * AsyncLoopThread 's runnable
- *
- * The class wrapper RunnableCallback fn, if occur exception, run killfn
- *
- * @author yannian
- *
- */
-public class AsyncLoopRunnable implements Runnable {
- private static Logger LOG = Logger.getLogger(AsyncLoopRunnable.class);
-
- private RunnableCallback fn;
- private RunnableCallback killfn;
-
- public AsyncLoopRunnable(RunnableCallback fn, RunnableCallback killfn) {
- this.fn = fn;
- this.killfn = killfn;
- }
-
- private boolean needQuit(Object rtn) {
- if (rtn != null) {
- long sleepTime = Long.parseLong(String.valueOf(rtn));
- if (sleepTime < 0) {
- return true;
- }else if (sleepTime > 0) {
- JStormUtils.sleepMs(sleepTime * 1000);
- }
- }
- return false;
- }
-
- @Override
- public void run() {
-
- try {
- while (true) {
- Exception e = null;
-
- try {
- if (fn == null) {
- LOG.warn("fn==null");
- throw new RuntimeException("AsyncLoopRunnable no core function ");
- }
-
- fn.run();
-
- e = fn.error();
-
- } catch (Exception ex) {
- e = ex;
- }
- if (e != null) {
- fn.shutdown();
- throw e;
- }
- Object rtn = fn.getResult();
- if (this.needQuit(rtn)) {
- return;
- }
-
- }
- } catch (InterruptedException e) {
- LOG.info("Async loop interrupted!");
- } catch (Throwable e) {
- Object rtn = fn.getResult();
- if (this.needQuit(rtn)) {
- return;
- }else {
- LOG.error("Async loop died!", e);
- killfn.execute(e);
- }
- }
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client-extension/src/main/java/com/alibaba/jstorm/callback/AsyncLoopThread.java
----------------------------------------------------------------------
diff --git a/jstorm-client-extension/src/main/java/com/alibaba/jstorm/callback/AsyncLoopThread.java b/jstorm-client-extension/src/main/java/com/alibaba/jstorm/callback/AsyncLoopThread.java
deleted file mode 100644
index 534386d..0000000
--- a/jstorm-client-extension/src/main/java/com/alibaba/jstorm/callback/AsyncLoopThread.java
+++ /dev/null
@@ -1,122 +0,0 @@
-package com.alibaba.jstorm.callback;
-
-import java.lang.Thread.UncaughtExceptionHandler;
-
-import org.apache.log4j.Logger;
-
-import backtype.storm.utils.Time;
-
-import com.alibaba.jstorm.utils.JStormUtils;
-import com.alibaba.jstorm.utils.SmartThread;
-
-/**
- * Wrapper Timer thread Every several seconds execute afn, if something is run,
- * run kill_fn
- *
- *
- * @author yannian
- *
- */
-public class AsyncLoopThread implements SmartThread {
- private static final Logger LOG = Logger.getLogger(AsyncLoopThread.class);
-
- private Thread thread;
-
- private RunnableCallback afn;
-
- public AsyncLoopThread(RunnableCallback afn) {
- this.init(afn, false, Thread.NORM_PRIORITY, true);
- }
-
- public AsyncLoopThread(RunnableCallback afn, boolean daemon, int priority,
- boolean start) {
- this.init(afn, daemon, priority, start);
- }
-
- public AsyncLoopThread(RunnableCallback afn, boolean daemon,
- RunnableCallback kill_fn, int priority, boolean start) {
- this.init(afn, daemon, kill_fn, priority, start);
- }
-
- public void init(RunnableCallback afn, boolean daemon, int priority,
- boolean start) {
- RunnableCallback kill_fn = new AsyncLoopDefaultKill();
- this.init(afn, daemon, kill_fn, priority, start);
- }
-
- /**
- *
- * @param afn
- * @param daemon
- * @param kill_fn
- * (Exception e)
- * @param priority
- * @param args_fn
- * @param start
- */
- private void init(RunnableCallback afn, boolean daemon,
- RunnableCallback kill_fn, int priority, boolean start) {
- if (kill_fn == null) {
- kill_fn = new AsyncLoopDefaultKill();
- }
-
- Runnable runable = new AsyncLoopRunnable(afn, kill_fn);
- thread = new Thread(runable);
- String threadName = afn.getThreadName();
- if (threadName == null) {
- threadName = afn.getClass().getSimpleName();
- }
- thread.setName(threadName);
- thread.setDaemon(daemon);
- thread.setPriority(priority);
- thread.setUncaughtExceptionHandler(new UncaughtExceptionHandler() {
- @Override
- public void uncaughtException(Thread t, Throwable e) {
- LOG.error("UncaughtException", e);
- JStormUtils.halt_process(1, "UncaughtException");
- }
- });
-
- this.afn = afn;
-
- if (start) {
- thread.start();
- }
-
- }
-
- @Override
- public void start() {
- thread.start();
- }
-
- @Override
- public void join() throws InterruptedException {
- thread.join();
- }
-
- // for test
- public void join(int times) throws InterruptedException {
- thread.join(times);
- }
-
- @Override
- public void interrupt() {
- thread.interrupt();
- }
-
- @Override
- public Boolean isSleeping() {
- return Time.isThreadWaiting(thread);
- }
-
- public Thread getThread() {
- return thread;
- }
-
- @Override
- public void cleanup() {
- // TODO Auto-generated method stub
- afn.cleanup();
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client-extension/src/main/java/com/alibaba/jstorm/callback/BaseCallback.java
----------------------------------------------------------------------
diff --git a/jstorm-client-extension/src/main/java/com/alibaba/jstorm/callback/BaseCallback.java b/jstorm-client-extension/src/main/java/com/alibaba/jstorm/callback/BaseCallback.java
deleted file mode 100644
index 31012fa..0000000
--- a/jstorm-client-extension/src/main/java/com/alibaba/jstorm/callback/BaseCallback.java
+++ /dev/null
@@ -1,13 +0,0 @@
-package com.alibaba.jstorm.callback;
-
-import com.alibaba.jstorm.callback.Callback;
-
-public class BaseCallback implements Callback {
-
- @Override
- public <T> Object execute(T... args) {
- // TODO Auto-generated method stub
- return null;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client-extension/src/main/java/com/alibaba/jstorm/callback/Callback.java
----------------------------------------------------------------------
diff --git a/jstorm-client-extension/src/main/java/com/alibaba/jstorm/callback/Callback.java b/jstorm-client-extension/src/main/java/com/alibaba/jstorm/callback/Callback.java
deleted file mode 100644
index d832a71..0000000
--- a/jstorm-client-extension/src/main/java/com/alibaba/jstorm/callback/Callback.java
+++ /dev/null
@@ -1,13 +0,0 @@
-package com.alibaba.jstorm.callback;
-
-/**
- * Callback interface
- *
- * @author lixin 2012-3-12
- *
- */
-public interface Callback {
-
- public <T> Object execute(T... args);
-
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client-extension/src/main/java/com/alibaba/jstorm/callback/ClusterStateCallback.java
----------------------------------------------------------------------
diff --git a/jstorm-client-extension/src/main/java/com/alibaba/jstorm/callback/ClusterStateCallback.java b/jstorm-client-extension/src/main/java/com/alibaba/jstorm/callback/ClusterStateCallback.java
deleted file mode 100644
index 2726b1d..0000000
--- a/jstorm-client-extension/src/main/java/com/alibaba/jstorm/callback/ClusterStateCallback.java
+++ /dev/null
@@ -1,7 +0,0 @@
-package com.alibaba.jstorm.callback;
-
-import com.alibaba.jstorm.callback.BaseCallback;
-
-public class ClusterStateCallback extends BaseCallback {
-
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client-extension/src/main/java/com/alibaba/jstorm/callback/DefaultWatcherCallBack.java
----------------------------------------------------------------------
diff --git a/jstorm-client-extension/src/main/java/com/alibaba/jstorm/callback/DefaultWatcherCallBack.java b/jstorm-client-extension/src/main/java/com/alibaba/jstorm/callback/DefaultWatcherCallBack.java
deleted file mode 100644
index bd4ce25..0000000
--- a/jstorm-client-extension/src/main/java/com/alibaba/jstorm/callback/DefaultWatcherCallBack.java
+++ /dev/null
@@ -1,26 +0,0 @@
-package com.alibaba.jstorm.callback;
-
-import org.apache.log4j.Logger;
-import org.apache.zookeeper.Watcher.Event.EventType;
-import org.apache.zookeeper.Watcher.Event.KeeperState;
-
-import com.alibaba.jstorm.zk.ZkEventTypes;
-import com.alibaba.jstorm.zk.ZkKeeperStates;
-
-/**
- * Default ZK watch callback
- *
- * @author yannian
- *
- */
-public class DefaultWatcherCallBack implements WatcherCallBack {
-
- private static Logger LOG = Logger.getLogger(DefaultWatcherCallBack.class);
-
- @Override
- public void execute(KeeperState state, EventType type, String path) {
- LOG.info("Zookeeper state update:" + ZkKeeperStates.getStateName(state)
- + "," + ZkEventTypes.getStateName(type) + "," + path);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client-extension/src/main/java/com/alibaba/jstorm/callback/RunnableCallback.java
----------------------------------------------------------------------
diff --git a/jstorm-client-extension/src/main/java/com/alibaba/jstorm/callback/RunnableCallback.java b/jstorm-client-extension/src/main/java/com/alibaba/jstorm/callback/RunnableCallback.java
deleted file mode 100644
index ccee6e2..0000000
--- a/jstorm-client-extension/src/main/java/com/alibaba/jstorm/callback/RunnableCallback.java
+++ /dev/null
@@ -1,47 +0,0 @@
-package com.alibaba.jstorm.callback;
-
-
-/**
- * Base Runnable/Callback function
- *
- * @author yannian
- *
- */
-public class RunnableCallback implements Runnable, Callback {
-
- @Override
- public <T> Object execute(T... args) {
- return null;
- }
-
- @Override
- public void run() {
-
- }
-
- public Exception error() {
- return null;
- }
-
- public Object getResult() {
- return null;
- }
-
- /**
- * Called by exception
- */
- public void shutdown() {
- }
-
- /**
- * Normal quit
- */
- public void cleanup() {
-
- }
-
- public String getThreadName() {
- return null;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client-extension/src/main/java/com/alibaba/jstorm/callback/WatcherCallBack.java
----------------------------------------------------------------------
diff --git a/jstorm-client-extension/src/main/java/com/alibaba/jstorm/callback/WatcherCallBack.java b/jstorm-client-extension/src/main/java/com/alibaba/jstorm/callback/WatcherCallBack.java
deleted file mode 100644
index 382cc4b..0000000
--- a/jstorm-client-extension/src/main/java/com/alibaba/jstorm/callback/WatcherCallBack.java
+++ /dev/null
@@ -1,8 +0,0 @@
-package com.alibaba.jstorm.callback;
-
-import org.apache.zookeeper.Watcher.Event.EventType;
-import org.apache.zookeeper.Watcher.Event.KeeperState;
-
-public interface WatcherCallBack {
- public void execute(KeeperState state, EventType type, String path);
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client-extension/src/main/java/com/alibaba/jstorm/client/ConfigExtension.java
----------------------------------------------------------------------
diff --git a/jstorm-client-extension/src/main/java/com/alibaba/jstorm/client/ConfigExtension.java b/jstorm-client-extension/src/main/java/com/alibaba/jstorm/client/ConfigExtension.java
deleted file mode 100644
index f71a4ce..0000000
--- a/jstorm-client-extension/src/main/java/com/alibaba/jstorm/client/ConfigExtension.java
+++ /dev/null
@@ -1,642 +0,0 @@
-package com.alibaba.jstorm.client;
-
-import java.security.InvalidParameterException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.commons.lang.StringUtils;
-
-import backtype.storm.Config;
-import backtype.storm.utils.Utils;
-
-import com.alibaba.jstorm.utils.JStormUtils;
-
-public class ConfigExtension {
- /**
- * if this configure has been set, the spout or bolt will log all receive
- * tuples
- *
- * topology.debug just for logging all sent tuples
- */
- protected static final String TOPOLOGY_DEBUG_RECV_TUPLE = "topology.debug.recv.tuple";
-
- public static void setTopologyDebugRecvTuple(Map conf, boolean debug) {
- conf.put(TOPOLOGY_DEBUG_RECV_TUPLE, Boolean.valueOf(debug));
- }
-
- public static Boolean isTopologyDebugRecvTuple(Map conf) {
- return JStormUtils.parseBoolean(conf.get(TOPOLOGY_DEBUG_RECV_TUPLE),
- false);
- }
-
- /**
- * port number of deamon httpserver server
- */
- private static final Integer DEFAULT_DEAMON_HTTPSERVER_PORT = 7621;
-
- protected static final String SUPERVISOR_DEAMON_HTTPSERVER_PORT = "supervisor.deamon.logview.port";
-
- public static Integer getSupervisorDeamonHttpserverPort(Map conf) {
- return JStormUtils.parseInt(
- conf.get(SUPERVISOR_DEAMON_HTTPSERVER_PORT),
- DEFAULT_DEAMON_HTTPSERVER_PORT + 1);
- }
-
- protected static final String NIMBUS_DEAMON_HTTPSERVER_PORT = "nimbus.deamon.logview.port";
-
- public static Integer getNimbusDeamonHttpserverPort(Map conf) {
- return JStormUtils.parseInt(conf.get(NIMBUS_DEAMON_HTTPSERVER_PORT),
- DEFAULT_DEAMON_HTTPSERVER_PORT);
- }
-
- /**
- * Worker gc parameter
- *
- *
- */
- protected static final String WORKER_GC_CHILDOPTS = "worker.gc.childopts";
-
- public static void setWorkerGc(Map conf, String gc) {
- conf.put(WORKER_GC_CHILDOPTS, gc);
- }
-
- public static String getWorkerGc(Map conf) {
- return (String) conf.get(WORKER_GC_CHILDOPTS);
- }
-
- protected static final String WOREKER_REDIRECT_OUTPUT = "worker.redirect.output";
-
- public static boolean getWorkerRedirectOutput(Map conf) {
- Object result = conf.get(WOREKER_REDIRECT_OUTPUT);
- if (result == null)
- return true;
- return (Boolean) result;
- }
-
- protected static final String WOREKER_REDIRECT_OUTPUT_FILE = "worker.redirect.output.file";
-
- public static void setWorkerRedirectOutputFile(Map conf, String outputPath) {
- conf.put(WOREKER_REDIRECT_OUTPUT_FILE, outputPath);
- }
-
- public static String getWorkerRedirectOutputFile(Map conf) {
- return (String)conf.get(WOREKER_REDIRECT_OUTPUT_FILE);
- }
-
- /**
- * Usually, spout finish prepare before bolt, so spout need wait several
- * seconds so that bolt finish preparation
- *
- * By default, the setting is 30 seconds
- */
- protected static final String SPOUT_DELAY_RUN = "spout.delay.run";
-
- public static void setSpoutDelayRunSeconds(Map conf, int delay) {
- conf.put(SPOUT_DELAY_RUN, Integer.valueOf(delay));
- }
-
- public static int getSpoutDelayRunSeconds(Map conf) {
- return JStormUtils.parseInt(conf.get(SPOUT_DELAY_RUN), 30);
- }
-
- /**
- * Default ZMQ Pending queue size
- */
- public static final int DEFAULT_ZMQ_MAX_QUEUE_MSG = 1000;
-
- /**
- * One task will alloc how many memory slot, the default setting is 1
- */
- protected static final String MEM_SLOTS_PER_TASK = "memory.slots.per.task";
-
- @Deprecated
- public static void setMemSlotPerTask(Map conf, int slotNum) {
- if (slotNum < 1) {
- throw new InvalidParameterException();
- }
- conf.put(MEM_SLOTS_PER_TASK, Integer.valueOf(slotNum));
- }
-
- /**
- * One task will use cpu slot number, the default setting is 1
- */
- protected static final String CPU_SLOTS_PER_TASK = "cpu.slots.per.task";
-
- @Deprecated
- public static void setCpuSlotsPerTask(Map conf, int slotNum) {
- if (slotNum < 1) {
- throw new InvalidParameterException();
- }
- conf.put(CPU_SLOTS_PER_TASK, Integer.valueOf(slotNum));
- }
-
- /**
- * if the setting has been set, the component's task must run different node
- * This is conflict with USE_SINGLE_NODE
- */
- protected static final String TASK_ON_DIFFERENT_NODE = "task.on.differ.node";
-
- public static void setTaskOnDifferentNode(Map conf, boolean isIsolate) {
- conf.put(TASK_ON_DIFFERENT_NODE, Boolean.valueOf(isIsolate));
- }
-
- public static boolean isTaskOnDifferentNode(Map conf) {
- return JStormUtils
- .parseBoolean(conf.get(TASK_ON_DIFFERENT_NODE), false);
- }
-
- protected static final String SUPERVISOR_ENABLE_CGROUP = "supervisor.enable.cgroup";
-
- public static boolean isEnableCgroup(Map conf) {
- return JStormUtils.parseBoolean(conf.get(SUPERVISOR_ENABLE_CGROUP),
- false);
- }
-
- /**
- * If component or topology configuration set "use.old.assignment", will try
- * use old assignment firstly
- */
- protected static final String USE_OLD_ASSIGNMENT = "use.old.assignment";
-
- public static void setUseOldAssignment(Map conf, boolean useOld) {
- conf.put(USE_OLD_ASSIGNMENT, Boolean.valueOf(useOld));
- }
-
- public static boolean isUseOldAssignment(Map conf) {
- return JStormUtils.parseBoolean(conf.get(USE_OLD_ASSIGNMENT), false);
- }
-
- /**
- * The supervisor's hostname
- */
- protected static final String SUPERVISOR_HOSTNAME = "supervisor.hostname";
- public static final Object SUPERVISOR_HOSTNAME_SCHEMA = String.class;
-
- public static String getSupervisorHost(Map conf) {
- return (String) conf.get(SUPERVISOR_HOSTNAME);
- }
-
- protected static final String SUPERVISOR_USE_IP = "supervisor.use.ip";
-
- public static boolean isSupervisorUseIp(Map conf) {
- return JStormUtils.parseBoolean(conf.get(SUPERVISOR_USE_IP), false);
- }
-
- protected static final String NIMBUS_USE_IP = "nimbus.use.ip";
-
- public static boolean isNimbusUseIp(Map conf) {
- return JStormUtils.parseBoolean(conf.get(NIMBUS_USE_IP), false);
- }
-
- protected static final String TOPOLOGY_ENABLE_CLASSLOADER = "topology.enable.classloader";
-
- public static boolean isEnableTopologyClassLoader(Map conf) {
- return JStormUtils.parseBoolean(conf.get(TOPOLOGY_ENABLE_CLASSLOADER),
- false);
- }
-
- public static void setEnableTopologyClassLoader(Map conf, boolean enable) {
- conf.put(TOPOLOGY_ENABLE_CLASSLOADER, Boolean.valueOf(enable));
- }
-
- protected static String CLASSLOADER_DEBUG = "classloader.debug";
-
- public static boolean isEnableClassloaderDebug(Map conf) {
- return JStormUtils.parseBoolean(conf.get(CLASSLOADER_DEBUG), false);
- }
-
- public static void setEnableClassloaderDebug(Map conf, boolean enable) {
- conf.put(CLASSLOADER_DEBUG, enable);
- }
-
- protected static final String CONTAINER_NIMBUS_HEARTBEAT = "container.nimbus.heartbeat";
-
- /**
- * Get to know whether nimbus is run under Apsara/Yarn container
- *
- * @param conf
- * @return
- */
- public static boolean isEnableContainerNimbus() {
- String path = System.getenv(CONTAINER_NIMBUS_HEARTBEAT);
-
- if (StringUtils.isBlank(path)) {
- return false;
- } else {
- return true;
- }
- }
-
- /**
- * Get Apsara/Yarn nimbus container's hearbeat dir
- *
- * @param conf
- * @return
- */
- public static String getContainerNimbusHearbeat() {
- return System.getenv(CONTAINER_NIMBUS_HEARTBEAT);
- }
-
- protected static final String CONTAINER_SUPERVISOR_HEARTBEAT = "container.supervisor.heartbeat";
-
- /**
- * Get to know whether supervisor is run under Apsara/Yarn supervisor
- * container
- *
- * @param conf
- * @return
- */
- public static boolean isEnableContainerSupervisor() {
- String path = System.getenv(CONTAINER_SUPERVISOR_HEARTBEAT);
-
- if (StringUtils.isBlank(path)) {
- return false;
- } else {
- return true;
- }
- }
-
- /**
- * Get Apsara/Yarn supervisor container's hearbeat dir
- *
- * @param conf
- * @return
- */
- public static String getContainerSupervisorHearbeat() {
- return (String) System.getenv(CONTAINER_SUPERVISOR_HEARTBEAT);
- }
-
- protected static final String CONTAINER_HEARTBEAT_TIMEOUT_SECONDS = "container.heartbeat.timeout.seconds";
-
- public static int getContainerHeartbeatTimeoutSeconds(Map conf) {
- return JStormUtils.parseInt(
- conf.get(CONTAINER_HEARTBEAT_TIMEOUT_SECONDS), 240);
- }
-
- protected static final String CONTAINER_HEARTBEAT_FREQUENCE = "container.heartbeat.frequence";
-
- public static int getContainerHeartbeatFrequence(Map conf) {
- return JStormUtils
- .parseInt(conf.get(CONTAINER_HEARTBEAT_FREQUENCE), 10);
- }
-
- protected static final String JAVA_SANDBOX_ENABLE = "java.sandbox.enable";
-
- public static boolean isJavaSandBoxEnable(Map conf) {
- return JStormUtils.parseBoolean(conf.get(JAVA_SANDBOX_ENABLE), false);
- }
-
- protected static String SPOUT_SINGLE_THREAD = "spout.single.thread";
-
- public static boolean isSpoutSingleThread(Map conf) {
- return JStormUtils.parseBoolean(conf.get(SPOUT_SINGLE_THREAD), false);
- }
-
- public static void setSpoutSingleThread(Map conf, boolean enable) {
- conf.put(SPOUT_SINGLE_THREAD, enable);
- }
-
- protected static String WORKER_STOP_WITHOUT_SUPERVISOR = "worker.stop.without.supervisor";
-
- public static boolean isWorkerStopWithoutSupervisor(Map conf) {
- return JStormUtils.parseBoolean(
- conf.get(WORKER_STOP_WITHOUT_SUPERVISOR), false);
- }
-
- protected static String CGROUP_ROOT_DIR = "supervisor.cgroup.rootdir";
-
- public static String getCgroupRootDir(Map conf) {
- return (String) conf.get(CGROUP_ROOT_DIR);
- }
-
- protected static String NETTY_TRANSFER_ASYNC_AND_BATCH = "storm.messaging.netty.transfer.async.batch";
-
- public static boolean isNettyTransferAsyncBatch(Map conf) {
- return JStormUtils.parseBoolean(
- conf.get(NETTY_TRANSFER_ASYNC_AND_BATCH), true);
- }
-
- protected static final String USE_USERDEFINE_ASSIGNMENT = "use.userdefine.assignment";
-
- public static void setUserDefineAssignment(Map conf,
- List<WorkerAssignment> userDefines) {
- List<String> ret = new ArrayList<String>();
- for (WorkerAssignment worker : userDefines) {
- ret.add(Utils.to_json(worker));
- }
- conf.put(USE_USERDEFINE_ASSIGNMENT, ret);
- }
-
- public static List<WorkerAssignment> getUserDefineAssignment(Map conf) {
- List<WorkerAssignment> ret = new ArrayList<WorkerAssignment>();
- if (conf.get(USE_USERDEFINE_ASSIGNMENT) == null)
- return ret;
- for (String worker : (List<String>) conf.get(USE_USERDEFINE_ASSIGNMENT)) {
- ret.add(WorkerAssignment.parseFromObj(Utils.from_json(worker)));
- }
- return ret;
- }
-
- protected static final String MEMSIZE_PER_WORKER = "worker.memory.size";
-
- public static void setMemSizePerWorker(Map conf, long memSize) {
- conf.put(MEMSIZE_PER_WORKER, memSize);
- }
-
- public static void setMemSizePerWorkerByKB(Map conf, long memSize) {
- long size = memSize * 1024l;
- setMemSizePerWorker(conf, size);
- }
-
- public static void setMemSizePerWorkerByMB(Map conf, long memSize) {
- long size = memSize * 1024l;
- setMemSizePerWorkerByKB(conf, size);
- }
-
- public static void setMemSizePerWorkerByGB(Map conf, long memSize) {
- long size = memSize * 1024l;
- setMemSizePerWorkerByMB(conf, size);
- }
-
- public static long getMemSizePerWorker(Map conf) {
- long size = JStormUtils.parseLong(conf.get(MEMSIZE_PER_WORKER),
- JStormUtils.SIZE_1_G * 2);
- return size > 0 ? size : JStormUtils.SIZE_1_G * 2;
- }
-
- protected static final String CPU_SLOT_PER_WORKER = "worker.cpu.slot.num";
-
- public static void setCpuSlotNumPerWorker(Map conf, int slotNum) {
- conf.put(CPU_SLOT_PER_WORKER, slotNum);
- }
-
- public static int getCpuSlotPerWorker(Map conf) {
- int slot = JStormUtils.parseInt(conf.get(CPU_SLOT_PER_WORKER), 1);
- return slot > 0 ? slot : 1;
- }
-
- protected static String TOPOLOGY_PERFORMANCE_METRICS = "topology.performance.metrics";
-
- public static boolean isEnablePerformanceMetrics(Map conf) {
- return JStormUtils.parseBoolean(conf.get(TOPOLOGY_PERFORMANCE_METRICS),
- true);
- }
-
- public static void setPerformanceMetrics(Map conf, boolean isEnable) {
- conf.put(TOPOLOGY_PERFORMANCE_METRICS, isEnable);
- }
-
- protected static String NETTY_BUFFER_THRESHOLD_SIZE = "storm.messaging.netty.buffer.threshold";
-
- public static long getNettyBufferThresholdSize(Map conf) {
- return JStormUtils.parseLong(conf.get(NETTY_BUFFER_THRESHOLD_SIZE),
- 8 *JStormUtils.SIZE_1_M);
- }
-
- public static void setNettyBufferThresholdSize(Map conf, long size) {
- conf.put(NETTY_BUFFER_THRESHOLD_SIZE, size);
- }
-
- protected static String NETTY_MAX_SEND_PENDING = "storm.messaging.netty.max.pending";
-
- public static void setNettyMaxSendPending(Map conf, long pending) {
- conf.put(NETTY_MAX_SEND_PENDING, pending);
- }
-
- public static long getNettyMaxSendPending(Map conf) {
- return JStormUtils.parseLong(conf.get(NETTY_MAX_SEND_PENDING), 16);
- }
-
- protected static String DISRUPTOR_USE_SLEEP = "disruptor.use.sleep";
-
- public static boolean isDisruptorUseSleep(Map conf) {
- return JStormUtils.parseBoolean(conf.get(DISRUPTOR_USE_SLEEP), true);
- }
-
- public static void setDisruptorUseSleep(Map conf, boolean useSleep) {
- conf.put(DISRUPTOR_USE_SLEEP, useSleep);
- }
-
- public static boolean isTopologyContainAcker(Map conf) {
- int num = JStormUtils.parseInt(conf.get(Config.TOPOLOGY_ACKER_EXECUTORS), 1);
- if (num > 0) {
- return true;
- }else {
- return false;
- }
- }
-
- protected static String NETTY_SYNC_MODE = "storm.messaging.netty.sync.mode";
-
- public static boolean isNettySyncMode(Map conf) {
- return JStormUtils.parseBoolean(conf.get(NETTY_SYNC_MODE), false);
- }
-
- public static void setNettySyncMode(Map conf, boolean sync) {
- conf.put(NETTY_SYNC_MODE, sync);
- }
-
- protected static String NETTY_ASYNC_BLOCK = "storm.messaging.netty.async.block";
- public static boolean isNettyASyncBlock(Map conf) {
- return JStormUtils.parseBoolean(conf.get(NETTY_ASYNC_BLOCK), true);
- }
-
- public static void setNettyASyncBlock(Map conf, boolean block) {
- conf.put(NETTY_ASYNC_BLOCK, block);
- }
-
- protected static String ALIMONITOR_METRICS_POST = "topology.alimonitor.metrics.post";
-
- public static boolean isAlimonitorMetricsPost(Map conf) {
- return JStormUtils.parseBoolean(conf.get(ALIMONITOR_METRICS_POST), true);
- }
-
- public static void setAlimonitorMetricsPost(Map conf, boolean post) {
- conf.put(ALIMONITOR_METRICS_POST, post);
- }
-
- protected static String TASK_CLEANUP_TIMEOUT_SEC = "task.cleanup.timeout.sec";
-
- public static int getTaskCleanupTimeoutSec(Map conf) {
- return JStormUtils.parseInt(conf.get(TASK_CLEANUP_TIMEOUT_SEC), 10);
- }
-
- public static void setTaskCleanupTimeoutSec(Map conf, int timeout) {
- conf.put(TASK_CLEANUP_TIMEOUT_SEC, timeout);
- }
-
- protected static String UI_CLUSTERS = "ui.clusters";
- protected static String UI_CLUSTER_NAME = "name";
- protected static String UI_CLUSTER_ZK_ROOT = "zkRoot";
- protected static String UI_CLUSTER_ZK_SERVERS = "zkServers";
- protected static String UI_CLUSTER_ZK_PORT = "zkPort";
-
- public static List<Map> getUiClusters(Map conf) {
- return (List<Map>) conf.get(UI_CLUSTERS);
- }
-
- public static void setUiClusters(Map conf, List<Map> uiClusters) {
- conf.put(UI_CLUSTERS, uiClusters);
- }
-
- public static Map getUiClusterInfo(List<Map> uiClusters, String name) {
- Map ret = null;
- for (Map cluster : uiClusters) {
- String clusterName = getUiClusterName(cluster);
- if (clusterName.equals(name)) {
- ret = cluster;
- break;
- }
- }
-
- return ret;
- }
-
- public static String getUiClusterName(Map uiCluster) {
- return (String) uiCluster.get(UI_CLUSTER_NAME);
- }
-
- public static String getUiClusterZkRoot(Map uiCluster) {
- return (String) uiCluster.get(UI_CLUSTER_ZK_ROOT);
- }
-
- public static List<String> getUiClusterZkServers(Map uiCluster) {
- return (List<String>) uiCluster.get(UI_CLUSTER_ZK_SERVERS);
- }
-
- public static Integer getUiClusterZkPort(Map uiCluster) {
- return JStormUtils.parseInt(uiCluster.get(UI_CLUSTER_ZK_PORT));
- }
-
- protected static String SPOUT_PEND_FULL_SLEEP = "spout.pending.full.sleep";
-
- public static boolean isSpoutPendFullSleep(Map conf) {
- return JStormUtils.parseBoolean(conf.get(SPOUT_PEND_FULL_SLEEP), false);
- }
-
- public static void setSpoutPendFullSleep(Map conf, boolean sleep) {
- conf.put(SPOUT_PEND_FULL_SLEEP, sleep);
-
- }
-
- protected static String LOGVIEW_ENCODING = "supervisor.deamon.logview.encoding";
- protected static String UTF8 = "utf-8";
-
- public static String getLogViewEncoding(Map conf) {
- String ret = (String) conf.get(LOGVIEW_ENCODING);
- if (ret == null) ret = UTF8;
- return ret;
- }
-
- public static void setLogViewEncoding(Map conf, String enc) {
- conf.put(LOGVIEW_ENCODING, enc);
- }
-
- public static String TASK_STATUS_ACTIVE = "Active";
- public static String TASK_STATUS_STARTING = "Starting";
-
- protected static String ALIMONITOR_TOPO_METIRC_NAME = "topology.alimonitor.topo.metrics.name";
- protected static String ALIMONITOR_TASK_METIRC_NAME = "topology.alimonitor.task.metrics.name";
- protected static String ALIMONITOR_WORKER_METIRC_NAME = "topology.alimonitor.worker.metrics.name";
- protected static String ALIMONITOR_USER_METIRC_NAME = "topology.alimonitor.user.metrics.name";
-
- public static String getAlmonTopoMetricName(Map conf) {
- return (String) conf.get(ALIMONITOR_TOPO_METIRC_NAME);
- }
-
- public static String getAlmonTaskMetricName(Map conf) {
- return (String) conf.get(ALIMONITOR_TASK_METIRC_NAME);
- }
-
- public static String getAlmonWorkerMetricName(Map conf) {
- return (String) conf.get(ALIMONITOR_WORKER_METIRC_NAME);
- }
-
- public static String getAlmonUserMetricName(Map conf) {
- return (String) conf.get(ALIMONITOR_USER_METIRC_NAME);
- }
-
- protected static String SPOUT_PARALLELISM = "topology.spout.parallelism";
- protected static String BOLT_PARALLELISM = "topology.bolt.parallelism";
-
- public static Integer getSpoutParallelism(Map conf, String componentName) {
- Integer ret = null;
- Map<String, String> map = (Map<String, String>)(conf.get(SPOUT_PARALLELISM));
- if(map != null) ret = JStormUtils.parseInt(map.get(componentName));
- return ret;
- }
-
- public static Integer getBoltParallelism(Map conf, String componentName) {
- Integer ret = null;
- Map<String, String> map = (Map<String, String>)(conf.get(BOLT_PARALLELISM));
- if(map != null) ret = JStormUtils.parseInt(map.get(componentName));
- return ret;
- }
-
- protected static String TOPOLOGY_BUFFER_SIZE_LIMITED = "topology.buffer.size.limited";
-
- public static void setTopologyBufferSizeLimited(Map conf, boolean limited) {
- conf.put(TOPOLOGY_BUFFER_SIZE_LIMITED, limited);
- }
-
- public static boolean getTopologyBufferSizeLimited(Map conf) {
- boolean isSynchronized = isNettySyncMode(conf);
- if (isSynchronized == true) {
- return true;
- }
-
- return JStormUtils.parseBoolean(conf.get(TOPOLOGY_BUFFER_SIZE_LIMITED), true);
-
- }
-
- protected static String SUPERVISOR_SLOTS_PORTS_BASE = "supervisor.slots.ports.base";
-
- public static int getSupervisorSlotsPortsBase(Map conf) {
- return JStormUtils.parseInt(conf.get(SUPERVISOR_SLOTS_PORTS_BASE), 6800);
- }
-
- // SUPERVISOR_SLOTS_PORTS_BASE don't provide setting function, it must be set by configuration
-
- protected static String SUPERVISOR_SLOTS_PORT_CPU_WEIGHT = "supervisor.slots.port.cpu.weight";
- public static double getSupervisorSlotsPortCpuWeight(Map conf) {
- Object value = conf.get(SUPERVISOR_SLOTS_PORT_CPU_WEIGHT);
- Double ret = JStormUtils.convertToDouble(value);
- if (ret == null) {
- return 1.0;
- }else {
- return ret;
- }
- }
- // SUPERVISOR_SLOTS_PORT_CPU_WEIGHT don't provide setting function, it must be set by configuration
-
- protected static String USER_DEFINED_LOG4J_CONF = "user.defined.log4j.conf";
-
- public static String getUserDefinedLog4jConf(Map conf) {
- return (String)conf.get(USER_DEFINED_LOG4J_CONF);
- }
-
- public static void setUserDefinedLog4jConf(Map conf, String fileName) {
- conf.put(USER_DEFINED_LOG4J_CONF, fileName);
- }
-
- protected static String USER_DEFINED_LOGBACK_CONF = "user.defined.logback.conf";
-
- public static String getUserDefinedLogbackConf(Map conf) {
- return (String)conf.get(USER_DEFINED_LOGBACK_CONF);
- }
-
- public static void setUserDefinedLogbackConf(Map conf, String fileName) {
- conf.put(USER_DEFINED_LOGBACK_CONF, fileName);
- }
-
- protected static String TASK_ERROR_INFO_REPORT_INTERVAL = "topology.task.error.report.interval";
-
- public static Integer getTaskErrorReportInterval(Map conf) {
- return JStormUtils.parseInt(conf.get(TASK_ERROR_INFO_REPORT_INTERVAL), 60);
- }
-
- public static void setTaskErrorReportInterval(Map conf, Integer interval) {
- conf.put(TASK_ERROR_INFO_REPORT_INTERVAL, interval);
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client-extension/src/main/java/com/alibaba/jstorm/client/WorkerAssignment.java
----------------------------------------------------------------------
diff --git a/jstorm-client-extension/src/main/java/com/alibaba/jstorm/client/WorkerAssignment.java b/jstorm-client-extension/src/main/java/com/alibaba/jstorm/client/WorkerAssignment.java
deleted file mode 100644
index 9eac326..0000000
--- a/jstorm-client-extension/src/main/java/com/alibaba/jstorm/client/WorkerAssignment.java
+++ /dev/null
@@ -1,264 +0,0 @@
-package com.alibaba.jstorm.client;
-
-import java.io.Serializable;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Map.Entry;
-
-import org.apache.commons.lang.builder.ToStringBuilder;
-import org.apache.commons.lang.builder.ToStringStyle;
-import org.apache.log4j.Logger;
-import org.json.simple.JSONAware;
-
-import com.alibaba.jstorm.utils.JStormUtils;
-
-import backtype.storm.scheduler.WorkerSlot;
-import backtype.storm.utils.Utils;
-
-
-public class WorkerAssignment extends WorkerSlot implements Serializable,
- JSONAware {
- private static final Logger LOG = Logger.getLogger(WorkerAssignment.class);
-
-
- private static final long serialVersionUID = -3483047434535537861L;
-
- private Map<String, Integer> componentToNum = new HashMap<String, Integer>();
-
- private long mem;
-
- private int cpu;
-
- private String hostName;
-
- private String jvm;
-
- private static final String COMPONENTTONUM_TAG = "componentToNum";
- private static final String MEM_TAG = "mem";
- private static final String CPU_TAG = "cpu";
- private static final String HOSTNAME_TAG = "hostName";
- private static final String JVM_TAG = "jvm";
- private static final String NODEID_TAG = "nodeId";
- private static final String PORT_TAG = "port";
-
- public WorkerAssignment(String nodeId, Number port) {
- super(nodeId, port);
- // TODO Auto-generated constructor stub
- }
-
- public WorkerAssignment() {
-
- }
-
- public void addComponent(String compenentName, Integer num) {
- componentToNum.put(compenentName, num);
- }
-
- public Map<String, Integer> getComponentToNum() {
- return componentToNum;
- }
-
- public String getHostName() {
- return hostName;
- }
-
- public void setHostName(String hostName) {
- this.hostName = hostName;
- }
-
- public void setJvm(String jvm) {
- this.jvm = jvm;
- }
-
- public String getJvm() {
- return jvm;
- }
-
- public long getMem() {
- return mem;
- }
-
- public void setMem(long mem) {
- this.mem = mem;
- }
-
- public int getCpu() {
- return cpu;
- }
-
- public void setCpu(int cpu) {
- this.cpu = cpu;
- }
-
- @Override
- public String toJSONString() {
-// StringBuilder sb = new StringBuilder();
-
-// sb.append("[");
-// sb.append("\"" + this.getNodeId() + "\"");
-// sb.append(",");
-// sb.append("\"" + this.hostName + "\"");
-// sb.append(",");
-// sb.append("\"" + String.valueOf(this.getPort()) + "\"");
-// sb.append(",");
-// sb.append("\"" + this.jvm + "\"");
-// sb.append(",");
-// sb.append("\"" + String.valueOf(this.mem) + "\"");
-// sb.append(",");
-// sb.append("\"" + String.valueOf(this.cpu) + "\"");
-// sb.append(",");
-// sb.append("{");
-// for (Entry<String, Integer> entry : componentToNum.entrySet()) {
-// sb.append("\"" + entry.getKey() + "\":");
-// sb.append("\"" + String.valueOf(entry.getValue()) + "\"");
-// sb.append(",");
-// }
-// sb.append("}");
-// sb.append("]");
-
-
-
- Map<String, String> map = new HashMap<String, String>();
-
- map.put(COMPONENTTONUM_TAG, Utils.to_json(componentToNum));
- map.put(MEM_TAG, String.valueOf(mem));
- map.put(CPU_TAG, String.valueOf(cpu));
- map.put(HOSTNAME_TAG, hostName);
- map.put(JVM_TAG, jvm);
- map.put(NODEID_TAG, getNodeId());
- map.put(PORT_TAG, String.valueOf(getPort()));
-
-
- return Utils.to_json(map);
- }
-
- public static WorkerAssignment parseFromObj(Object obj) {
- if (obj == null) {
- return null;
- }
-
- if (obj instanceof Map == false) {
- return null;
- }
-
- try {
- Map<String, String> map = (Map<String, String>)obj;
-
- String supervisorId = map.get(NODEID_TAG);
- String hostname = map.get(HOSTNAME_TAG);
- Integer port = JStormUtils.parseInt(map.get(PORT_TAG));
- String jvm = map.get(JVM_TAG);
- Long mem = JStormUtils.parseLong(map.get(MEM_TAG));
- Integer cpu = JStormUtils.parseInt(map.get(CPU_TAG));
- Map<String, Object> componentToNum = (Map<String, Object>)Utils.from_json(map.get(COMPONENTTONUM_TAG));
-
- WorkerAssignment ret = new WorkerAssignment(supervisorId, port);
-
-
- ret.hostName = hostname;
- ret.setNodeId(supervisorId);
- ret.setJvm(jvm);
- if (port != null) {
- ret.setPort(port);
- }
- if (mem != null) {
- ret.setMem(mem);
- }
- if (cpu != null) {
- ret.setCpu(cpu);
- }
-
- for (Entry<String, Object> entry : componentToNum.entrySet()) {
- ret.addComponent(entry.getKey(),
- JStormUtils.parseInt(entry.getValue()));
- }
- return ret;
- } catch (Exception e) {
- LOG.error("Failed to convert to WorkerAssignment, raw:" + obj, e);
- return null;
- }
-
- }
-
- public static String getStringFromJson(String text) {
- return text.equals("null") ? null : text;
- }
-
- @Override
- public String toString() {
- return ToStringBuilder.reflectionToString(this,
- ToStringStyle.SHORT_PREFIX_STYLE);
- }
-
- @Override
- public int hashCode() {
- final int prime = 31;
- int result = super.hashCode();
- result = prime * result
- + ((componentToNum == null) ? 0 : componentToNum.hashCode());
- result = prime * result + cpu;
- result = prime * result
- + ((hostName == null) ? 0 : hostName.hashCode());
- result = prime * result + ((jvm == null) ? 0 : jvm.hashCode());
- result = prime * result + (int) (mem ^ (mem >>> 32));
- return result;
- }
-
- @Override
- public boolean equals(Object obj) {
- if (this == obj)
- return true;
- if (!super.equals(obj))
- return false;
- if (getClass() != obj.getClass())
- return false;
- WorkerAssignment other = (WorkerAssignment) obj;
- if (componentToNum == null) {
- if (other.componentToNum != null)
- return false;
- } else if (!componentToNum.equals(other.componentToNum))
- return false;
- if (cpu != other.cpu)
- return false;
- if (hostName == null) {
- if (other.hostName != null)
- return false;
- } else if (!hostName.equals(other.hostName))
- return false;
- if (jvm == null) {
- if (other.jvm != null)
- return false;
- } else if (!jvm.equals(other.jvm))
- return false;
- if (mem != other.mem)
- return false;
- return true;
- }
-
- public static void main(String[] args) {
- WorkerAssignment input = new WorkerAssignment();
-
- input.setJvm("sb");
-
- input.setCpu(1);
-
- input.setMem(2);
-
- input.addComponent("2b", 2);
-
- String outString = Utils.to_json(input);
-
- System.out.println(input);
-
- //String outString = "[componentToNum={},mem=1610612736,cpu=1,hostName=mobilejstorm-60-1,jvm=<null>,nodeId=<null>,port=0]";
-
- Object object = Utils.from_json(outString);
- System.out.println(object);
-
- System.out.println(parseFromObj(object));
-
- System.out.print(input.equals(parseFromObj(object)));
- }
-
-
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client-extension/src/main/java/com/alibaba/jstorm/client/metric/MetricCallback.java
----------------------------------------------------------------------
diff --git a/jstorm-client-extension/src/main/java/com/alibaba/jstorm/client/metric/MetricCallback.java b/jstorm-client-extension/src/main/java/com/alibaba/jstorm/client/metric/MetricCallback.java
deleted file mode 100644
index 964913e..0000000
--- a/jstorm-client-extension/src/main/java/com/alibaba/jstorm/client/metric/MetricCallback.java
+++ /dev/null
@@ -1,7 +0,0 @@
-package com.alibaba.jstorm.client.metric;
-
-import com.codahale.metrics.Metric;
-
-public interface MetricCallback<T extends Metric> {
- void callback(T metric);
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client-extension/src/main/java/com/alibaba/jstorm/client/metric/MetricClient.java
----------------------------------------------------------------------
diff --git a/jstorm-client-extension/src/main/java/com/alibaba/jstorm/client/metric/MetricClient.java b/jstorm-client-extension/src/main/java/com/alibaba/jstorm/client/metric/MetricClient.java
deleted file mode 100644
index becc365..0000000
--- a/jstorm-client-extension/src/main/java/com/alibaba/jstorm/client/metric/MetricClient.java
+++ /dev/null
@@ -1,66 +0,0 @@
-package com.alibaba.jstorm.client.metric;
-
-import backtype.storm.task.TopologyContext;
-
-import com.alibaba.jstorm.metric.Metrics;
-import com.codahale.metrics.Counter;
-import com.codahale.metrics.Gauge;
-import com.codahale.metrics.Histogram;
-import com.codahale.metrics.Meter;
-import com.codahale.metrics.Timer;
-import com.alibaba.jstorm.metric.JStormTimer;
-import com.alibaba.jstorm.metric.JStormHistogram;
-
-public class MetricClient {
-
- private final int taskid;
-
- public MetricClient(TopologyContext context) {
- taskid = context.getThisTaskId();
- }
-
- private String getMetricName(Integer taskid, String name) {
- return "task-" + String.valueOf(taskid) + ":" + name;
- }
-
- public Gauge<?> registerGauge(String name, Gauge<?> gauge, MetricCallback<Gauge<?>> callback) {
- String userMetricName = getMetricName(taskid, name);
- Gauge<?> ret = Metrics.registerGauge(userMetricName, gauge);
- Metrics.registerUserDefine(userMetricName, gauge, callback);
- return ret;
- }
-
- public Counter registerCounter(String name, MetricCallback<Counter> callback) {
- String userMetricName = getMetricName(taskid, name);
- Counter ret = Metrics.registerCounter(userMetricName);
- Metrics.registerUserDefine(userMetricName, ret, callback);
- return ret;
- }
-
- public Meter registerMeter(String name, MetricCallback<Meter> callback) {
- String userMetricName = getMetricName(taskid, name);
- Meter ret = Metrics.registerMeter(userMetricName);
- Metrics.registerUserDefine(userMetricName, ret, callback);
- return ret;
- }
-
- public JStormTimer registerTimer(String name, MetricCallback<Timer> callback) {
- String userMetricName = getMetricName(taskid, name);
- JStormTimer ret = Metrics.registerTimer(userMetricName);
- Metrics.registerUserDefine(userMetricName, ret, callback);
- return ret;
- }
-
- public JStormHistogram registerHistogram(String name, MetricCallback<Histogram> callback) {
- String userMetricName = getMetricName(taskid, name);
- JStormHistogram ret = Metrics.registerHistograms(userMetricName);
- Metrics.registerUserDefine(userMetricName, ret, callback);
- return ret;
- }
-
- public boolean unregister(String name, Integer taskid) {
- String userMetricName = getMetricName(taskid, name);
- return Metrics.unregisterUserDefine(userMetricName);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client-extension/src/main/java/com/alibaba/jstorm/client/spout/IAckValueSpout.java
----------------------------------------------------------------------
diff --git a/jstorm-client-extension/src/main/java/com/alibaba/jstorm/client/spout/IAckValueSpout.java b/jstorm-client-extension/src/main/java/com/alibaba/jstorm/client/spout/IAckValueSpout.java
deleted file mode 100644
index f140098..0000000
--- a/jstorm-client-extension/src/main/java/com/alibaba/jstorm/client/spout/IAckValueSpout.java
+++ /dev/null
@@ -1,15 +0,0 @@
-package com.alibaba.jstorm.client.spout;
-
-import java.util.List;
-
-/**
- * This interface will list emit values when tuple success
- *
- * if spout implement this interface,
- * spout won't call ISpout.ack() when tuple success
- *
- * @author longda
- */
-public interface IAckValueSpout {
- void ack(Object msgId, List<Object> values);
-}