You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by pt...@apache.org on 2015/11/05 21:41:37 UTC

[58/60] [abbrv] storm git commit: remove jstorm-utility directory

remove jstorm-utility directory


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/e8f64d5e
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/e8f64d5e
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/e8f64d5e

Branch: refs/heads/jstorm-import
Commit: e8f64d5e88a8f5c29c1633104d06de0970d57676
Parents: e1f6844
Author: P. Taylor Goetz <pt...@gmail.com>
Authored: Thu Nov 5 15:23:13 2015 -0500
Committer: P. Taylor Goetz <pt...@gmail.com>
Committed: Thu Nov 5 15:23:13 2015 -0500

----------------------------------------------------------------------
 .gitmodules                                     |   3 -
 jstorm-utility/jstorm-kafka/.gitignore          |   1 -
 jstorm-utility/jstorm-kafka/README.md           |   0
 jstorm-utility/jstorm-kafka/pom.xml             | 146 --------
 .../java/com/alibaba/jstorm/kafka/Host.java     |  57 ----
 .../com/alibaba/jstorm/kafka/KafkaConsumer.java | 241 -------------
 .../alibaba/jstorm/kafka/KafkaMessageId.java    |  27 --
 .../com/alibaba/jstorm/kafka/KafkaSpout.java    | 124 -------
 .../alibaba/jstorm/kafka/KafkaSpoutConfig.java  | 130 -------
 .../alibaba/jstorm/kafka/PartitionConsumer.java | 227 -------------
 .../jstorm/kafka/PartitionCoordinator.java      |  49 ---
 .../java/com/alibaba/jstorm/kafka/ZkState.java  |  95 ------
 .../alibaba/jstorm/test/kafka/KafkaTest.java    |  57 ----
 jstorm-utility/jstorm-rocket-mq/pom.xml         |  94 -----
 .../alibaba/aloha/meta/MetaClientConfig.java    | 263 --------------
 .../alibaba/aloha/meta/MetaConsumerFactory.java | 109 ------
 .../java/com/alibaba/aloha/meta/MetaSpout.java  | 248 --------------
 .../java/com/alibaba/aloha/meta/MetaTuple.java  |  90 -----
 .../aloha/meta/example/TestTopology.java        | 150 --------
 .../alibaba/aloha/meta/example/WriterBolt.java  |  59 ----
 .../target/classes/META-INF/MANIFEST.MF         |   5 -
 .../com.alibaba.jstorm/metaspout/pom.properties |   7 -
 .../maven/com.alibaba.jstorm/metaspout/pom.xml  |  94 -----
 .../test/main/resources/metaspout.yaml          |  32 --
 jstorm-utility/ons/conf/ons.yaml                |  49 ---
 jstorm-utility/ons/pom.xml                      | 101 ------
 .../java/com/alibaba/jstorm/LoadConfig.java     |  67 ----
 .../java/com/alibaba/jstorm/TestTopology.java   |  80 -----
 .../java/com/alibaba/jstorm/ons/OnsConfig.java  |  69 ----
 .../java/com/alibaba/jstorm/ons/OnsTuple.java   |  80 -----
 .../jstorm/ons/consumer/ConsumerConfig.java     |  65 ----
 .../jstorm/ons/consumer/ConsumerFactory.java    |  49 ---
 .../jstorm/ons/consumer/ConsumerSpout.java      | 268 ---------------
 .../jstorm/ons/producer/ProducerBolt.java       |  94 -----
 .../jstorm/ons/producer/ProducerConfig.java     |  29 --
 .../jstorm/ons/producer/ProducerFactory.java    |  59 ----
 .../ons/test/main/resources/metaspout.yaml      |  32 --
 jstorm-utility/rocket-mq                        |   1 -
 jstorm-utility/topology-monitor/.gitignore      |  13 -
 jstorm-utility/topology-monitor/README.md       |   2 -
 jstorm-utility/topology-monitor/pom.xml         | 110 ------
 .../cosmos/BlackholeBlockingQueueSpout.java     | 114 -------
 .../com/dianping/cosmos/BlackholeSpout.java     | 101 ------
 .../com/dianping/cosmos/MessageFetcher.java     |  50 ---
 .../java/com/dianping/cosmos/PumaSpout.java     | 194 -----------
 .../java/com/dianping/cosmos/RedisSinkBolt.java | 167 ---------
 .../main/java/com/dianping/cosmos/Updater.java  |   9 -
 .../cosmos/metric/CatMetricsConsumer.java       |  70 ----
 .../dianping/cosmos/monitor/HttpCatClient.java  |  57 ----
 .../cosmos/monitor/HttpClientService.java       | 120 -------
 .../dianping/cosmos/monitor/SpoutCounter.java   |  24 --
 .../cosmos/monitor/TopologyMonitor.java         |  90 -----
 .../monitor/topology/ClusterInfoBolt.java       | 170 ----------
 .../monitor/topology/ClusterInfoTopology.java   |  18 -
 .../com/dianping/cosmos/util/CatClient.java     |  19 --
 .../com/dianping/cosmos/util/CatMetricUtil.java |  45 ---
 .../com/dianping/cosmos/util/Constants.java     |   9 -
 .../java/com/dianping/cosmos/util/JSONUtil.java | 125 -------
 .../com/dianping/cosmos/util/TupleHelpers.java  |  33 --
 .../transaction_meta_spout/conf/topology.yaml   |  21 --
 jstorm-utility/transaction_meta_spout/pom.xml   |  68 ----
 .../batch/example/BatchMetaRebalance.java       | 108 ------
 .../jstorm/batch/example/BatchMetaSpout.java    | 131 -------
 .../jstorm/batch/example/BatchMetaTopology.java | 163 ---------
 .../alibaba/jstorm/batch/example/CountBolt.java |  84 -----
 .../alibaba/jstorm/batch/example/DBBolt.java    | 261 --------------
 .../jstorm/batch/example/TransformBolt.java     |  63 ----
 .../jstorm/batch/meta/MetaSimpleClient.java     | 340 -------------------
 .../jstorm/batch/meta/MetaSpoutConfig.java      | 119 -------
 .../src/main/resources/metaspout.default.prop   |  15 -
 70 files changed, 6264 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/e8f64d5e/.gitmodules
----------------------------------------------------------------------
diff --git a/.gitmodules b/.gitmodules
index c68bb15..e69de29 100644
--- a/.gitmodules
+++ b/.gitmodules
@@ -1,3 +0,0 @@
-[submodule "jstorm-utility/rocket-mq"]
-	path = jstorm-utility/rocket-mq
-	url = https://github.com/rocketmq/rocketmq-storm

http://git-wip-us.apache.org/repos/asf/storm/blob/e8f64d5e/jstorm-utility/jstorm-kafka/.gitignore
----------------------------------------------------------------------
diff --git a/jstorm-utility/jstorm-kafka/.gitignore b/jstorm-utility/jstorm-kafka/.gitignore
deleted file mode 100644
index b83d222..0000000
--- a/jstorm-utility/jstorm-kafka/.gitignore
+++ /dev/null
@@ -1 +0,0 @@
-/target/

http://git-wip-us.apache.org/repos/asf/storm/blob/e8f64d5e/jstorm-utility/jstorm-kafka/README.md
----------------------------------------------------------------------
diff --git a/jstorm-utility/jstorm-kafka/README.md b/jstorm-utility/jstorm-kafka/README.md
deleted file mode 100644
index e69de29..0000000

http://git-wip-us.apache.org/repos/asf/storm/blob/e8f64d5e/jstorm-utility/jstorm-kafka/pom.xml
----------------------------------------------------------------------
diff --git a/jstorm-utility/jstorm-kafka/pom.xml b/jstorm-utility/jstorm-kafka/pom.xml
deleted file mode 100755
index df7c0ea..0000000
--- a/jstorm-utility/jstorm-kafka/pom.xml
+++ /dev/null
@@ -1,146 +0,0 @@
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
-	<parent>
-		<groupId>com.alibaba.jstorm</groupId>
-		<artifactId>jstorm-all</artifactId>
-		<version>0.9.6.2</version>
-		<relativePath>../..</relativePath>
-	</parent>
-
-	<modelVersion>4.0.0</modelVersion>
-	<artifactId>jstorm-kafka</artifactId>
-	<packaging>jar</packaging>
-
-	<name>${project.artifactId}-${project.version}</name>
-	<description>jstorm kafka</description>
-
-	<url>http://maven.apache.org</url>
-
-	<properties>
-		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
-		<jstorm.version>${parent.version}</jstorm.version>
-		<kafka.version>0.8.1</kafka.version>
-		<curator.version>1.3.2</curator.version>
-	</properties>
-
-	<dependencies>
-
-
-		<dependency>
-			<groupId>com.alibaba.jstorm</groupId>
-			<artifactId>jstorm-client-extension</artifactId>
-			<version>${jstorm.version}</version>
-			<scope>provided</scope>
-		</dependency>
-
-
-		<dependency>
-			<groupId>com.alibaba.jstorm</groupId>
-			<artifactId>jstorm-client</artifactId>
-			<version>${jstorm.version}</version>
-			<scope>provided</scope>
-		</dependency>
-
-		<dependency>
-			<groupId>com.alibaba.jstorm</groupId>
-			<artifactId>jstorm-server</artifactId>
-			<version>${jstorm.version}</version>
-			<scope>provided</scope>
-		</dependency>
-
-		<dependency>
-			<groupId>org.apache.kafka</groupId>
-			<artifactId>kafka_2.9.2</artifactId>
-			<version>${kafka.version}</version>
-			<exclusions>
-				<exclusion>
-					<groupId>org.apache.zookeeper</groupId>
-					<artifactId>zookeeper</artifactId>
-				</exclusion>
-				<exclusion>
-					<groupId>log4j</groupId>
-					<artifactId>log4j</artifactId>
-				</exclusion>
-				<exclusion>
-					<groupId>junit</groupId>
-					<artifactId>junit</artifactId>
-				</exclusion>
-			</exclusions>
-		</dependency>
-
-		<dependency>
-			<groupId>org.apache.zookeeper</groupId>
-			<artifactId>zookeeper</artifactId>
-			<version>3.4.5</version>
-			<exclusions>
-				<exclusion>
-					<groupId>log4j</groupId>
-					<artifactId>log4j</artifactId>
-				</exclusion>
-			</exclusions>
-		</dependency>
-
-		<dependency>
-			<groupId>com.netflix.curator</groupId>
-			<artifactId>curator-framework</artifactId>
-			<version>${curator.version}</version>
-			<exclusions>
-				<exclusion>
-					<groupId>log4j</groupId>
-					<artifactId>log4j</artifactId>
-				</exclusion>
-				<exclusion>
-					<groupId>org.slf4j</groupId>
-					<artifactId>slf4j-log4j12</artifactId>
-				</exclusion>
-			</exclusions>
-		</dependency>
-
-
-
-		<dependency>
-			<groupId>com.netflix.curator</groupId>
-			<artifactId>curator-recipes</artifactId>
-			<version>${curator.version}</version>
-			<exclusions>
-				<exclusion>
-					<groupId>log4j</groupId>
-					<artifactId>log4j</artifactId>
-				</exclusion>
-			</exclusions>
-			<scope>test</scope>
-		</dependency>
-		<dependency>
-			<groupId>com.netflix.curator</groupId>
-			<artifactId>curator-test</artifactId>
-			<version>${curator.version}</version>
-			<exclusions>
-				<exclusion>
-					<groupId>log4j</groupId>
-					<artifactId>log4j</artifactId>
-				</exclusion>
-				<exclusion>
-					<groupId>org.testng</groupId>
-					<artifactId>testng</artifactId>
-				</exclusion>
-			</exclusions>
-			<scope>test</scope>
-		</dependency>
-
-
-	</dependencies>
-
-	<build>
-		<plugins>
-			<plugin>
-				<groupId>org.apache.maven.plugins</groupId>
-				<artifactId>maven-compiler-plugin</artifactId>
-				<configuration>
-					<source>1.6</source>
-					<target>1.6</target>
-					<encoding>${project.build.sourceEncoding}</encoding>
-				</configuration>
-			</plugin>
-		</plugins>
-	</build>
-</project>

http://git-wip-us.apache.org/repos/asf/storm/blob/e8f64d5e/jstorm-utility/jstorm-kafka/src/main/java/com/alibaba/jstorm/kafka/Host.java
----------------------------------------------------------------------
diff --git a/jstorm-utility/jstorm-kafka/src/main/java/com/alibaba/jstorm/kafka/Host.java b/jstorm-utility/jstorm-kafka/src/main/java/com/alibaba/jstorm/kafka/Host.java
deleted file mode 100644
index 36227a0..0000000
--- a/jstorm-utility/jstorm-kafka/src/main/java/com/alibaba/jstorm/kafka/Host.java
+++ /dev/null
@@ -1,57 +0,0 @@
-package com.alibaba.jstorm.kafka;
-
-import java.io.Serializable;
-/**
- * 
- * @author feilaoda
- *
- */
-public class Host implements Serializable {
-    /**
-     * 
-     */
-    private static final long serialVersionUID = -315213440689707962L;
-    private String host;
-    private int port;
-
-    public Host(String host) {
-        this(host, 9092);
-    }
-
-    public Host(String host, int port) {
-        this.host = host;
-        this.port = port;
-    }
-
-    public String getHost() {
-        return host;
-    }
-
-    public void setHost(String host) {
-        this.host = host;
-    }
-
-    public int getPort() {
-        return port;
-    }
-
-    public void setPort(int port) {
-        this.port = port;
-    }
-
-    @Override
-    public boolean equals(Object obj) {
-        if (this == obj) {
-            return true;
-        }
-        if (obj == null || getClass() != obj.getClass()) {
-            return false;
-        }
-        if (obj instanceof Host) {
-            final Host other = (Host) obj;
-            return this.host.equals(other.host) && this.port == other.port;
-        } else {
-            return false;
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e8f64d5e/jstorm-utility/jstorm-kafka/src/main/java/com/alibaba/jstorm/kafka/KafkaConsumer.java
----------------------------------------------------------------------
diff --git a/jstorm-utility/jstorm-kafka/src/main/java/com/alibaba/jstorm/kafka/KafkaConsumer.java b/jstorm-utility/jstorm-kafka/src/main/java/com/alibaba/jstorm/kafka/KafkaConsumer.java
deleted file mode 100644
index 787b285..0000000
--- a/jstorm-utility/jstorm-kafka/src/main/java/com/alibaba/jstorm/kafka/KafkaConsumer.java
+++ /dev/null
@@ -1,241 +0,0 @@
-package com.alibaba.jstorm.kafka;
-
-import java.io.IOException;
-import java.net.ConnectException;
-import java.net.SocketTimeoutException;
-import java.nio.channels.UnresolvedAddressException;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.log4j.Logger;
-
-import kafka.api.FetchRequest;
-import kafka.api.FetchRequestBuilder;
-import kafka.api.PartitionOffsetRequestInfo;
-import kafka.cluster.Broker;
-import kafka.common.ErrorMapping;
-import kafka.common.KafkaException;
-import kafka.common.TopicAndPartition;
-import kafka.javaapi.FetchResponse;
-import kafka.javaapi.OffsetRequest;
-import kafka.javaapi.PartitionMetadata;
-import kafka.javaapi.TopicMetadata;
-import kafka.javaapi.TopicMetadataRequest;
-import kafka.javaapi.consumer.SimpleConsumer;
-import kafka.javaapi.message.ByteBufferMessageSet;
-
-/**
- * 
- * @author feilaoda
- *
- */
-public class KafkaConsumer {
-
-    private static Logger LOG = Logger.getLogger(KafkaConsumer.class);
-
-    public static final int NO_OFFSET = -1;
-
-    private int status;
-    private SimpleConsumer consumer = null;
-
-    private KafkaSpoutConfig config;
-    private LinkedList<Host> brokerList;
-    private int brokerIndex;
-    private Broker leaderBroker;
-
-    public KafkaConsumer(KafkaSpoutConfig config) {
-        this.config = config;
-        this.brokerList = new LinkedList<Host>(config.brokers);
-        this.brokerIndex = 0;
-    }
-
-    public ByteBufferMessageSet fetchMessages(int partition, long offset) throws IOException {
-
-        String topic = config.topic;
-        FetchRequest req = new FetchRequestBuilder().clientId(config.clientId).addFetch(topic, partition, offset, config.fetchMaxBytes)
-                .maxWait(config.fetchWaitMaxMs).build();
-        FetchResponse fetchResponse = null;
-        SimpleConsumer simpleConsumer = null;
-        try {
-            simpleConsumer = findLeaderConsumer(partition);
-            if (simpleConsumer == null) {
-                // LOG.error(message);
-                return null;
-            }
-            fetchResponse = simpleConsumer.fetch(req);
-        } catch (Exception e) {
-            if (e instanceof ConnectException || e instanceof SocketTimeoutException || e instanceof IOException
-                    || e instanceof UnresolvedAddressException) {
-                LOG.warn("Network error when fetching messages:", e);
-                if (simpleConsumer != null) {
-                    String host = simpleConsumer.host();
-                    int port = simpleConsumer.port();
-                    simpleConsumer = null;
-                    throw new KafkaException("Network error when fetching messages: " + host + ":" + port + " , " + e.getMessage(), e);
-                }
-
-            } else {
-                throw new RuntimeException(e);
-            }
-        }
-        if (fetchResponse.hasError()) {
-            short code = fetchResponse.errorCode(topic, partition);
-            if (code == ErrorMapping.OffsetOutOfRangeCode() && config.resetOffsetIfOutOfRange) {
-                long startOffset = getOffset(topic, partition, config.startOffsetTime);
-                offset = startOffset;
-            }
-            if(leaderBroker != null) {
-                LOG.error("fetch data from kafka topic[" + config.topic + "] host[" + leaderBroker.host() + ":" + leaderBroker.port() + "] partition["
-                    + partition + "] error:" + code);
-            }else {
-                
-            }
-            return null;
-        } else {
-            ByteBufferMessageSet msgs = fetchResponse.messageSet(topic, partition);
-            return msgs;
-        }
-    }
-
-    private SimpleConsumer findLeaderConsumer(int partition) {
-        try {
-            if (consumer != null) {
-                return consumer;
-            }
-            PartitionMetadata metadata = findLeader(partition);
-            if (metadata == null) {
-                leaderBroker = null;
-                consumer = null;
-                return null;
-            }
-            leaderBroker = metadata.leader();
-            consumer = new SimpleConsumer(leaderBroker.host(), leaderBroker.port(), config.socketTimeoutMs, config.socketReceiveBufferBytes,
-                    config.clientId);
-
-            return consumer;
-        } catch (Exception e) {
-            LOG.error(e.getMessage(), e);
-        }
-        return null;
-    }
-
-    protected PartitionMetadata findLeader(int partition) {
-        PartitionMetadata returnMetaData = null;
-        int errors = 0;
-        int size = brokerList.size();
-
-        Host brokerHost = brokerList.get(brokerIndex);
-        try {
-            if (consumer == null) {
-                consumer = new SimpleConsumer(brokerHost.getHost(), brokerHost.getPort(), config.socketTimeoutMs, config.socketReceiveBufferBytes,
-                        config.clientId);
-            }
-        } catch (Exception e) {
-            LOG.warn(e.getMessage(), e);
-            consumer = null;
-        }
-        int i = brokerIndex;
-        loop: while (i < size && errors < size + 1) {
-            Host host = brokerList.get(i);
-            i = (i + 1) % size;
-            brokerIndex = i; // next index
-            try {
-
-                if (consumer == null) {
-                    consumer = new SimpleConsumer(host.getHost(), host.getPort(), config.socketTimeoutMs, config.socketReceiveBufferBytes,
-                            config.clientId);
-                }
-                List<String> topics = Collections.singletonList(config.topic);
-                TopicMetadataRequest req = new TopicMetadataRequest(topics);
-                kafka.javaapi.TopicMetadataResponse resp = null;
-                try {
-                    resp = consumer.send(req);
-                } catch (Exception e) {
-                    errors += 1;
-
-                    LOG.error("findLeader error, broker:" + host.toString() + ", will change to next broker index:" + (i + 1) % size);
-                    if (consumer != null) {
-                        consumer.close();
-                        consumer = null;
-                    }
-                    continue;
-                }
-
-                List<TopicMetadata> metaData = resp.topicsMetadata();
-                for (TopicMetadata item : metaData) {
-                    for (PartitionMetadata part : item.partitionsMetadata()) {
-                        if (part.partitionId() == partition) {
-                            returnMetaData = part;
-                            break loop;
-                        }
-                    }
-                }
-
-            } catch (Exception e) {
-                LOG.error("Error communicating with Broker:" + host.toString() + ", find Leader for partition:" + partition);
-            } finally {
-                if (consumer != null) {
-                    consumer.close();
-                    consumer = null;
-                }
-            }
-        }
-
-        return returnMetaData;
-    }
-
-    public long getOffset(String topic, int partition, long startOffsetTime) {
-        SimpleConsumer simpleConsumer = findLeaderConsumer(partition);
-
-        if (simpleConsumer == null) {
-            LOG.error("Error consumer is null get offset from partition:" + partition);
-            return -1;
-        }
-
-        TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partition);
-        Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();
-        requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(startOffsetTime, 1));
-        OffsetRequest request = new OffsetRequest(requestInfo, kafka.api.OffsetRequest.CurrentVersion(), simpleConsumer.clientId());
-
-        long[] offsets = simpleConsumer.getOffsetsBefore(request).offsets(topic, partition);
-        if (offsets.length > 0) {
-            return offsets[0];
-        } else {
-            return NO_OFFSET;
-        }
-    }
-
-    public void close() {
-        if (consumer != null) {
-            consumer.close();
-        }
-    }
-
-    public SimpleConsumer getConsumer() {
-        return consumer;
-    }
-
-    public void setConsumer(SimpleConsumer consumer) {
-        this.consumer = consumer;
-    }
-
-    public int getStatus() {
-        return status;
-    }
-
-    public void setStatus(int status) {
-        this.status = status;
-    }
-
-    public Broker getLeaderBroker() {
-        return leaderBroker;
-    }
-
-    public void setLeaderBroker(Broker leaderBroker) {
-        this.leaderBroker = leaderBroker;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e8f64d5e/jstorm-utility/jstorm-kafka/src/main/java/com/alibaba/jstorm/kafka/KafkaMessageId.java
----------------------------------------------------------------------
diff --git a/jstorm-utility/jstorm-kafka/src/main/java/com/alibaba/jstorm/kafka/KafkaMessageId.java b/jstorm-utility/jstorm-kafka/src/main/java/com/alibaba/jstorm/kafka/KafkaMessageId.java
deleted file mode 100644
index a7fe8ca..0000000
--- a/jstorm-utility/jstorm-kafka/src/main/java/com/alibaba/jstorm/kafka/KafkaMessageId.java
+++ /dev/null
@@ -1,27 +0,0 @@
-package com.alibaba.jstorm.kafka;
-
-public class KafkaMessageId {
-	private int partition;
-	private long offset;
-    
-    public KafkaMessageId(int partition, long offset) {
-        this.setPartition(partition);
-        this.setOffset(offset);
-    }
-
-	public int getPartition() {
-		return partition;
-	}
-
-	public void setPartition(int partition) {
-		this.partition = partition;
-	}
-
-	public long getOffset() {
-		return offset;
-	}
-
-	public void setOffset(long offset) {
-		this.offset = offset;
-	}
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e8f64d5e/jstorm-utility/jstorm-kafka/src/main/java/com/alibaba/jstorm/kafka/KafkaSpout.java
----------------------------------------------------------------------
diff --git a/jstorm-utility/jstorm-kafka/src/main/java/com/alibaba/jstorm/kafka/KafkaSpout.java b/jstorm-utility/jstorm-kafka/src/main/java/com/alibaba/jstorm/kafka/KafkaSpout.java
deleted file mode 100644
index 4fa11fa..0000000
--- a/jstorm-utility/jstorm-kafka/src/main/java/com/alibaba/jstorm/kafka/KafkaSpout.java
+++ /dev/null
@@ -1,124 +0,0 @@
-package com.alibaba.jstorm.kafka;
-
-import java.util.Collection;
-import java.util.Map;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.alibaba.jstorm.kafka.PartitionConsumer.EmitState;
-
-import backtype.storm.spout.SpoutOutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.IRichSpout;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.tuple.Fields;
-
-public class KafkaSpout implements IRichSpout {
-	/**
-	 * 
-	 */
-	private static final long serialVersionUID = 1L;
-	private static Logger LOG = LoggerFactory.getLogger(KafkaSpout.class);
-
-	protected SpoutOutputCollector collector;
-	
-	private long lastUpdateMs;
-	PartitionCoordinator coordinator;
-	
-	private KafkaSpoutConfig config;
-	
-	private ZkState zkState;
-	
-	public KafkaSpout() {
-	    config = new KafkaSpoutConfig();
-	}
-	
-	public KafkaSpout(KafkaSpoutConfig config) {
-		this.config = config;
-	}
-	
-	@Override
-	public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
-		// TODO Auto-generated method stub
-		this.collector = collector;
-		config.configure(conf);
-		zkState = new ZkState(conf, config);
-		coordinator = new PartitionCoordinator(conf, config, context, zkState);
-		lastUpdateMs = System.currentTimeMillis();
-	}
-
-	@Override
-	public void close() {
-		// TODO Auto-generated method stub
-	    zkState.close();
-	}
-
-	@Override
-	public void activate() {
-		// TODO Auto-generated method stub
-
-	}
-
-	@Override
-	public void deactivate() {
-		// TODO Auto-generated method stub
-		
-	}
-
-	@Override
-	public void nextTuple() {
-		Collection<PartitionConsumer> partitionConsumers = coordinator.getPartitionConsumers();
-		for(PartitionConsumer consumer: partitionConsumers) {
-			EmitState state = consumer.emit(collector);
-			LOG.debug("====== partition "+ consumer.getPartition() + " emit message state is "+state);
-//			if(state != EmitState.EMIT_MORE) {
-//				currentPartitionIndex  = (currentPartitionIndex+1) % consumerSize;
-//			}
-//			if(state != EmitState.EMIT_NONE) {
-//				break;
-//			}
-		}
-		long now = System.currentTimeMillis();
-        if((now - lastUpdateMs) > config.offsetUpdateIntervalMs) {
-            commitState();
-        }
-        
-		
-	}
-	
-	public void commitState() {
-	    lastUpdateMs = System.currentTimeMillis();
-		for(PartitionConsumer consumer: coordinator.getPartitionConsumers()) {
-			consumer.commitState();
-        }
-		
-	}
-
-	@Override
-	public void ack(Object msgId) {
-		KafkaMessageId messageId = (KafkaMessageId)msgId;
-		PartitionConsumer consumer = coordinator.getConsumer(messageId.getPartition());
-		consumer.ack(messageId.getOffset());
-	}
-
-	@Override
-	public void fail(Object msgId) {
-		KafkaMessageId messageId = (KafkaMessageId)msgId;
-		PartitionConsumer consumer = coordinator.getConsumer(messageId.getPartition());
-		consumer.fail(messageId.getOffset());
-	}
-
-	@Override
-	public void declareOutputFields(OutputFieldsDeclarer declarer) {
-		declarer.declare(new Fields("bytes"));
-	}
-
-	@Override
-	public Map<String, Object> getComponentConfiguration() {
-		return null;
-	}
-	
-	
-
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e8f64d5e/jstorm-utility/jstorm-kafka/src/main/java/com/alibaba/jstorm/kafka/KafkaSpoutConfig.java
----------------------------------------------------------------------
diff --git a/jstorm-utility/jstorm-kafka/src/main/java/com/alibaba/jstorm/kafka/KafkaSpoutConfig.java b/jstorm-utility/jstorm-kafka/src/main/java/com/alibaba/jstorm/kafka/KafkaSpoutConfig.java
deleted file mode 100644
index 86b6e69..0000000
--- a/jstorm-utility/jstorm-kafka/src/main/java/com/alibaba/jstorm/kafka/KafkaSpoutConfig.java
+++ /dev/null
@@ -1,130 +0,0 @@
-package com.alibaba.jstorm.kafka;
-
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-
-
-import com.alibaba.jstorm.utils.JStormUtils;
-
-import backtype.storm.spout.MultiScheme;
-import backtype.storm.spout.RawMultiScheme;
-
-
-public class KafkaSpoutConfig implements Serializable {
-
-	
-	private static final long serialVersionUID = 1L;
-
-	public List<Host> brokers;
-	public int numPartitions;
-	public String topic;
-	public String zkRoot;
-	
-	public List<Host> zkServers;
-	
-	public int fetchMaxBytes = 256*1024;
-	public int fetchWaitMaxMs = 10000;
-    public int socketTimeoutMs = 30 * 1000;
-    public int socketReceiveBufferBytes = 64*1024;
-    public long startOffsetTime = -1;
-    public boolean fromBeginning = false;
-    public String clientId;
-    public boolean resetOffsetIfOutOfRange = false;
-    public long offsetUpdateIntervalMs=2000;
-    private Properties properties = null;
-    private Map stormConf;
-    public int batchSendCount = 1;
-    
-    public KafkaSpoutConfig() {
-    }
-    
-    public KafkaSpoutConfig(Properties properties) {
-        this.properties = properties;
-    }
-    
-    public void configure(Map conf) {
-        this.stormConf = conf;
-        topic = getConfig("kafka.topic", "jstorm");
-        zkRoot = getConfig("storm.zookeeper.root", "/jstorm");
-        
-        String zkHosts = getConfig("kafka.zookeeper.hosts", "127.0.0.1:2181");
-        zkServers = convertHosts(zkHosts, 2181);
-        String brokerHosts = getConfig("kafka.broker.hosts", "127.0.0.1:9092");
-        brokers = convertHosts(brokerHosts, 9092);
-        
-        numPartitions = JStormUtils.parseInt(getConfig("kafka.broker.partitions"), 1);
-        fetchMaxBytes = JStormUtils.parseInt(getConfig("kafka.fetch.max.bytes"), 256*1024);
-        fetchWaitMaxMs = JStormUtils.parseInt(getConfig("kafka.fetch.wait.max.ms"), 10000);
-        socketTimeoutMs = JStormUtils.parseInt(getConfig("kafka.socket.timeout.ms"), 30 * 1000);
-        socketReceiveBufferBytes = JStormUtils.parseInt(getConfig("kafka.socket.receive.buffer.bytes"), 64*1024);
-        fromBeginning = JStormUtils.parseBoolean(getConfig("kafka.fetch.from.beginning"), false);
-        startOffsetTime = JStormUtils.parseInt(getConfig("kafka.start.offset.time"), -1);
-        offsetUpdateIntervalMs = JStormUtils.parseInt(getConfig("kafka.offset.update.interval.ms"), 2000);
-        clientId = getConfig("kafka.client.id", "jstorm");
-        batchSendCount = JStormUtils.parseInt(getConfig("kafka.spout.batch.send.count"), 1);
-    }
-    
-    
-      private String getConfig(String key) {
-          return getConfig(key, null);
-      }
-        
-	  private String getConfig(String key, String defaultValue) {
-	      if(properties!=null && properties.containsKey(key)) {
-	          return properties.getProperty(key);
-	      }else if(stormConf.containsKey(key)) {
-	          return String.valueOf(stormConf.get(key));
-	      }else {
-	          return defaultValue;
-	      }
-	  }
-
-	
-	public  List<Host> convertHosts(String hosts, int defaultPort) {
-	    List<Host> hostList = new ArrayList<Host>();
-	    String[] hostArr = hosts.split(",");
-        for (String s : hostArr) {
-            Host host;
-            String[] spec = s.split(":");
-            if (spec.length == 1) {
-                host = new Host(spec[0],defaultPort);
-            } else if (spec.length == 2) {
-                host = new Host(spec[0], JStormUtils.parseInt(spec[1]));
-            } else {
-                throw new IllegalArgumentException("Invalid host specification: " + s);
-            }
-            hostList.add(host);
-        }
-        return hostList;
-    }
-	
-
-	public List<Host> getHosts() {
-		return brokers;
-	}
-
-	public void setHosts(List<Host> hosts) {
-		this.brokers = hosts;
-	}
-
-	public int getPartitionsPerBroker() {
-		return numPartitions;
-	}
-
-	public void setPartitionsPerBroker(int partitionsPerBroker) {
-		this.numPartitions = partitionsPerBroker;
-	}
-
-	public String getTopic() {
-		return topic;
-	}
-
-	public void setTopic(String topic) {
-		this.topic = topic;
-	}
-
-	
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e8f64d5e/jstorm-utility/jstorm-kafka/src/main/java/com/alibaba/jstorm/kafka/PartitionConsumer.java
----------------------------------------------------------------------
diff --git a/jstorm-utility/jstorm-kafka/src/main/java/com/alibaba/jstorm/kafka/PartitionConsumer.java b/jstorm-utility/jstorm-kafka/src/main/java/com/alibaba/jstorm/kafka/PartitionConsumer.java
deleted file mode 100644
index 4b8ad7f..0000000
--- a/jstorm-utility/jstorm-kafka/src/main/java/com/alibaba/jstorm/kafka/PartitionConsumer.java
+++ /dev/null
@@ -1,227 +0,0 @@
-package com.alibaba.jstorm.kafka;
-
-
-import java.nio.ByteBuffer;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.SortedSet;
-import java.util.TreeSet;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.collect.ImmutableMap;
-
-import kafka.javaapi.message.ByteBufferMessageSet;
-import kafka.message.Message;
-import kafka.message.MessageAndOffset;
-import backtype.storm.Config;
-import backtype.storm.spout.SpoutOutputCollector;
-import backtype.storm.utils.Utils;
-
-/**
- * 
- * @author feilaoda
- *
- */
-public class PartitionConsumer {
-    private static Logger LOG = LoggerFactory.getLogger(PartitionConsumer.class);
-
-    static enum EmitState {
-        EMIT_MORE, EMIT_END, EMIT_NONE
-    }
-
-    private int partition;
-    private KafkaConsumer consumer;
-   
-
-    private PartitionCoordinator coordinator;
-
-    private KafkaSpoutConfig config;
-    private LinkedList<MessageAndOffset> emittingMessages = new LinkedList<MessageAndOffset>();
-    private SortedSet<Long> pendingOffsets = new TreeSet<Long>();
-    private SortedSet<Long> failedOffsets = new TreeSet<Long>();
-    private long emittingOffset;
-    private long lastCommittedOffset;
-    private ZkState zkState;
-    private Map stormConf;
-
-    public PartitionConsumer(Map conf, KafkaSpoutConfig config, int partition, ZkState offsetState) {
-        this.stormConf = conf;
-        this.config = config;
-        this.partition = partition;
-        this.consumer = new KafkaConsumer(config);
-        this.zkState = offsetState;
-
-        Long jsonOffset = null;
-        try {
-            Map<Object, Object> json = offsetState.readJSON(zkPath());
-            if (json != null) {
-                // jsonTopologyId = (String)((Map<Object,Object>)json.get("topology"));
-                jsonOffset = (Long) json.get("offset");
-            }
-        } catch (Throwable e) {
-            LOG.warn("Error reading and/or parsing at ZkNode: " + zkPath(), e);
-        }
-
-        try {
-            if (config.fromBeginning) {
-                emittingOffset = consumer.getOffset(config.topic, partition, kafka.api.OffsetRequest.EarliestTime());
-            } else {
-                if (jsonOffset == null) {
-                    lastCommittedOffset = consumer.getOffset(config.topic, partition, kafka.api.OffsetRequest.LatestTime());
-                } else {
-                    lastCommittedOffset = jsonOffset;
-                }
-                emittingOffset = lastCommittedOffset;
-            }
-        } catch (Exception e) {
-            LOG.error(e.getMessage(), e);
-        }
-    }
-
-    public EmitState emit(SpoutOutputCollector collector) {
-        if (emittingMessages.isEmpty()) {
-            fillMessages();
-        }
-
-        int count = 0;
-        while (true) {
-            MessageAndOffset toEmitMsg = emittingMessages.pollFirst();
-            if (toEmitMsg == null) {
-                return EmitState.EMIT_END;
-            }
-            count ++;
-            Iterable<List<Object>> tups = generateTuples(toEmitMsg.message());
-
-            if (tups != null) {
-                for (List<Object> tuple : tups) {
-                    LOG.debug("emit message {}", new String(Utils.toByteArray(toEmitMsg.message().payload())));
-                    collector.emit(tuple, new KafkaMessageId(partition, toEmitMsg.offset()));
-                }
-                if(count>=config.batchSendCount) {
-                    break;
-                }
-            } else {
-                ack(toEmitMsg.offset());
-            }
-        }
-
-        if (emittingMessages.isEmpty()) {
-            return EmitState.EMIT_END;
-        } else {
-            return EmitState.EMIT_MORE;
-        }
-    }
-
-    private void fillMessages() {
-
-        ByteBufferMessageSet msgs;
-        try {
-            long start = System.currentTimeMillis();
-            msgs = consumer.fetchMessages(partition, emittingOffset + 1);
-            
-            if (msgs == null) {
-                LOG.error("fetch null message from offset {}", emittingOffset);
-                return;
-            }
-            
-            int count = 0;
-            for (MessageAndOffset msg : msgs) {
-                count += 1;
-                emittingMessages.add(msg);
-                emittingOffset = msg.offset();
-                pendingOffsets.add(emittingOffset);
-                LOG.debug("fillmessage fetched a message:{}, offset:{}", msg.message().toString(), msg.offset());
-            }
-            long end = System.currentTimeMillis();
-            LOG.info("fetch message from partition:"+partition+", offset:" + emittingOffset+", size:"+msgs.sizeInBytes()+", count:"+count +", time:"+(end-start));
-        } catch (Exception e) {
-            e.printStackTrace();
-            LOG.error(e.getMessage(),e);
-        }
-    }
-
-    public void commitState() {
-        try {
-            long lastOffset = 0;
-            if (pendingOffsets.isEmpty() || pendingOffsets.size() <= 0) {
-                lastOffset = emittingOffset;
-            } else {
-                lastOffset = pendingOffsets.first();
-            }
-            if (lastOffset != lastCommittedOffset) {
-                Map<Object, Object> data = new HashMap<Object, Object>();
-                data.put("topology", stormConf.get(Config.TOPOLOGY_NAME));
-                data.put("offset", lastOffset);
-                data.put("partition", partition);
-                data.put("broker", ImmutableMap.of("host", consumer.getLeaderBroker().host(), "port", consumer.getLeaderBroker().port()));
-                data.put("topic", config.topic);
-                zkState.writeJSON(zkPath(), data);
-                lastCommittedOffset = lastOffset;
-            }
-        } catch (Exception e) {
-            LOG.error(e.getMessage(), e);
-        }
-
-    }
-
-    public void ack(long offset) {
-        try {
-            pendingOffsets.remove(offset);
-        } catch (Exception e) {
-            LOG.error("offset ack error " + offset);
-        }
-    }
-
-    public void fail(long offset) {
-        failedOffsets.remove(offset);
-    }
-
-    public void close() {
-        coordinator.removeConsumer(partition);
-        consumer.close();
-    }
-
-    @SuppressWarnings("unchecked")
-    public Iterable<List<Object>> generateTuples(Message msg) {
-        Iterable<List<Object>> tups = null;
-        ByteBuffer payload = msg.payload();
-        if (payload == null) {
-            return null;
-        }
-        tups = Arrays.asList(Utils.tuple(Utils.toByteArray(payload)));
-        return tups;
-    }
-
-    private String zkPath() {
-        return config.zkRoot + "/kafka/offset/topic/" + config.topic + "/" + config.clientId + "/" + partition;
-    }
-
-    public PartitionCoordinator getCoordinator() {
-        return coordinator;
-    }
-
-    public void setCoordinator(PartitionCoordinator coordinator) {
-        this.coordinator = coordinator;
-    }
-
-    public int getPartition() {
-        return partition;
-    }
-
-    public void setPartition(int partition) {
-        this.partition = partition;
-    }
-
-    public KafkaConsumer getConsumer() {
-        return consumer;
-    }
-
-    public void setConsumer(KafkaConsumer consumer) {
-        this.consumer = consumer;
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e8f64d5e/jstorm-utility/jstorm-kafka/src/main/java/com/alibaba/jstorm/kafka/PartitionCoordinator.java
----------------------------------------------------------------------
diff --git a/jstorm-utility/jstorm-kafka/src/main/java/com/alibaba/jstorm/kafka/PartitionCoordinator.java b/jstorm-utility/jstorm-kafka/src/main/java/com/alibaba/jstorm/kafka/PartitionCoordinator.java
deleted file mode 100644
index 25dc368..0000000
--- a/jstorm-utility/jstorm-kafka/src/main/java/com/alibaba/jstorm/kafka/PartitionCoordinator.java
+++ /dev/null
@@ -1,49 +0,0 @@
-package com.alibaba.jstorm.kafka;
-
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-
-import backtype.storm.task.TopologyContext;
-
-public class PartitionCoordinator {
-	private KafkaSpoutConfig config;
-	private Map<Integer, PartitionConsumer> partitionConsumerMap;
-	private List<PartitionConsumer> partitionConsumers;
-
-	private ZkState zkState;
-	public PartitionCoordinator(Map conf, KafkaSpoutConfig config, TopologyContext context, ZkState zkState) {
-		this.config = config;
-		this.zkState = zkState; 
-		partitionConsumers = new LinkedList<PartitionConsumer>();
-		createPartitionConsumers(conf, context);
-	}
-
-	private void createPartitionConsumers(Map conf, TopologyContext context) {
-	    partitionConsumerMap = new HashMap<Integer, PartitionConsumer>();
-        int taskSize = context.getComponentTasks(context.getThisComponentId()).size();
-        for(int i=context.getThisTaskIndex(); i<config.numPartitions; i+=taskSize) {
-            PartitionConsumer partitionConsumer = new PartitionConsumer(conf, config, i, zkState);
-            partitionConsumers.add(partitionConsumer);
-            partitionConsumerMap.put(i, partitionConsumer);
-        }
-	}
-
-	public List<PartitionConsumer> getPartitionConsumers() {
-		return partitionConsumers;
-	}
-	
-	public PartitionConsumer getConsumer(int partition) {
-		return partitionConsumerMap.get(partition);
-	}
-	
-	public void removeConsumer(int partition) {
-	    PartitionConsumer partitionConsumer = partitionConsumerMap.get(partition);
-		partitionConsumers.remove(partitionConsumer);
-		partitionConsumerMap.remove(partition);
-	}
-	
-	
-	 
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e8f64d5e/jstorm-utility/jstorm-kafka/src/main/java/com/alibaba/jstorm/kafka/ZkState.java
----------------------------------------------------------------------
diff --git a/jstorm-utility/jstorm-kafka/src/main/java/com/alibaba/jstorm/kafka/ZkState.java b/jstorm-utility/jstorm-kafka/src/main/java/com/alibaba/jstorm/kafka/ZkState.java
deleted file mode 100644
index ac512e5..0000000
--- a/jstorm-utility/jstorm-kafka/src/main/java/com/alibaba/jstorm/kafka/ZkState.java
+++ /dev/null
@@ -1,95 +0,0 @@
-package com.alibaba.jstorm.kafka;
-
-import backtype.storm.Config;
-import backtype.storm.utils.Utils;
-
-import java.nio.charset.Charset;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.CuratorFrameworkFactory;
-import org.apache.curator.framework.api.CreateBuilder;
-import org.apache.curator.framework.api.ProtectACLCreateModePathAndBytesable;
-import org.apache.curator.retry.RetryNTimes;
-import org.apache.zookeeper.CreateMode;
-import org.json.simple.JSONValue;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class ZkState {
-    public static final Logger LOG = LoggerFactory.getLogger(ZkState.class);
-    CuratorFramework _curator;
-
-    private CuratorFramework newCurator(Map conf, KafkaSpoutConfig config) throws Exception {
-        String serverPorts = "";
-        List<Host> zkServers = config.zkServers;
-        for (Host server : zkServers) {
-            serverPorts = serverPorts + server.getHost() + ":" + server.getPort() + ",";
-        }
-        return CuratorFrameworkFactory.newClient(serverPorts, Utils.getInt(conf.get(Config.STORM_ZOOKEEPER_SESSION_TIMEOUT)), 15000, new RetryNTimes(
-                Utils.getInt(conf.get(Config.STORM_ZOOKEEPER_RETRY_TIMES)), Utils.getInt(conf.get(Config.STORM_ZOOKEEPER_RETRY_INTERVAL))));
-    }
-
-    public CuratorFramework getCurator() {
-        assert _curator != null;
-        return _curator;
-    }
-
-    public ZkState(Map stateConf, KafkaSpoutConfig config) {
-        try {
-            _curator = newCurator(stateConf, config);
-            _curator.start();
-        } catch (Exception e) {
-            throw new RuntimeException(e);
-        }
-    }
-
-    public void writeJSON(String path, Map<Object, Object> data) {
-        LOG.info("Writing " + path + " the data " + data.toString());
-        writeBytes(path, JSONValue.toJSONString(data).getBytes(Charset.forName("UTF-8")));
-    }
-
-    public void writeBytes(String path, byte[] bytes) {
-        try {
-            if (_curator.checkExists().forPath(path) == null) {
-                CreateBuilder builder = _curator.create();
-                ProtectACLCreateModePathAndBytesable<String> createAble = (ProtectACLCreateModePathAndBytesable<String>) builder
-                        .creatingParentsIfNeeded();
-                createAble.withMode(CreateMode.PERSISTENT).forPath(path, bytes);
-            } else {
-                _curator.setData().forPath(path, bytes);
-            }
-        } catch (Exception e) {
-            throw new RuntimeException(e);
-        }
-    }
-
-    public Map<Object, Object> readJSON(String path) {
-        try {
-            byte[] b = readBytes(path);
-            if (b == null)
-                return null;
-            return (Map<Object, Object>) JSONValue.parse(new String(b, "UTF-8"));
-        } catch (Exception e) {
-            throw new RuntimeException(e);
-        }
-    }
-
-    public byte[] readBytes(String path) {
-        try {
-            if (_curator.checkExists().forPath(path) != null) {
-                return _curator.getData().forPath(path);
-            } else {
-                return null;
-            }
-        } catch (Exception e) {
-            throw new RuntimeException(e);
-        }
-    }
-
-    public void close() {
-        _curator.close();
-        _curator = null;
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/e8f64d5e/jstorm-utility/jstorm-kafka/src/test/java/com/alibaba/jstorm/test/kafka/KafkaTest.java
----------------------------------------------------------------------
diff --git a/jstorm-utility/jstorm-kafka/src/test/java/com/alibaba/jstorm/test/kafka/KafkaTest.java b/jstorm-utility/jstorm-kafka/src/test/java/com/alibaba/jstorm/test/kafka/KafkaTest.java
deleted file mode 100644
index a63adec..0000000
--- a/jstorm-utility/jstorm-kafka/src/test/java/com/alibaba/jstorm/test/kafka/KafkaTest.java
+++ /dev/null
@@ -1,57 +0,0 @@
-package com.alibaba.jstorm.test.kafka;
-
-import java.util.Properties;
-
-import kafka.server.KafkaServerStartable;
-
-import com.netflix.curator.framework.CuratorFramework;
-import com.netflix.curator.framework.CuratorFrameworkFactory;
-import com.netflix.curator.retry.ExponentialBackoffRetry;
-import com.netflix.curator.test.TestingServer;
-
-public class KafkaTest {
-    private final int port = 49123;
-    private KafkaServerStartable kafka;
-    private TestingServer server;
-    private String zookeeperConnectionString;
-
-    public KafkaTest() {}
-    
-    public void run() {
-        try {
-            server = new TestingServer();
-            zookeeperConnectionString = server.getConnectString();
-            ExponentialBackoffRetry retryPolicy = new ExponentialBackoffRetry(
-                    1000, 3);
-            CuratorFramework zookeeper = CuratorFrameworkFactory.newClient(
-                    zookeeperConnectionString, retryPolicy);
-            zookeeper.start();
-            Properties p = new Properties();
-            p.setProperty("zookeeper.connect", zookeeperConnectionString);
-            p.setProperty("broker.id", "0");
-            p.setProperty("port", "" + port);
-            kafka.server.KafkaConfig config = new kafka.server.KafkaConfig(p);
-            kafka = new KafkaServerStartable(config);
-            kafka.startup();
-        } catch (Exception ex) {
-            throw new RuntimeException("Could not start test broker", ex);
-        }
-    }
-
-    public String getBrokerConnectionString() {
-        return "localhost:" + port;
-    }
-
-    public int getPort() {
-        return port;
-    }
-
-    public void shutdown() {
-        kafka.shutdown();
-    }
-    
-    public static void main(String[] args) {
-        KafkaTest test = new KafkaTest();
-        test.run();
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e8f64d5e/jstorm-utility/jstorm-rocket-mq/pom.xml
----------------------------------------------------------------------
diff --git a/jstorm-utility/jstorm-rocket-mq/pom.xml b/jstorm-utility/jstorm-rocket-mq/pom.xml
deleted file mode 100644
index ffc611e..0000000
--- a/jstorm-utility/jstorm-rocket-mq/pom.xml
+++ /dev/null
@@ -1,94 +0,0 @@
-<?xml version="1.0"?>
-<project xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd" xmlns="http://maven.apache.org/POM/4.0.0"
-    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
-  <modelVersion>4.0.0</modelVersion>
-    <groupId>com.alibaba.jstorm</groupId>
-    <artifactId>metaspout</artifactId>
-    <version>0.2.0-SNAPSHOT</version>
-
-	<properties>
-		<jstorm.version>0.9.6.1</jstorm.version>
-	</properties>
-	
-	<build>
-		<plugins>
-			<plugin>
-				<artifactId>maven-assembly-plugin</artifactId>
-				<configuration>
-					<descriptorRefs>
-						<descriptorRef>jar-with-dependencies</descriptorRef>
-					</descriptorRefs>
-				</configuration>
-				<executions>
-					<execution>
-						<id>make-assembly</id>
-						<phase>package</phase>
-						<goals>
-							<goal>single</goal>
-						</goals>
-					</execution>
-				</executions>
-			</plugin>
-			<plugin>
-				<groupId>org.apache.maven.plugins</groupId>
-				<artifactId>maven-compiler-plugin</artifactId>
-				<configuration>
-					<source>1.6</source>
-					<target>1.6</target>
-				</configuration>
-			</plugin>
-		</plugins>
-	</build>
-
-	<dependencies>
-	    <dependency>
-			<groupId>com.alibaba.jstorm</groupId>
-			<artifactId>jstorm-client</artifactId>
-			<version>${jstorm.version}</version>
-			<scope>provided</scope>
-		</dependency>
-		<dependency>
-			<groupId>com.alibaba.jstorm</groupId>
-			<artifactId>jstorm-client-extension</artifactId>
-			<version>${jstorm.version}</version>
-			<scope>provided</scope>
-		</dependency>
-		<dependency>
-			<groupId>com.alibaba.jstorm</groupId>
-			<artifactId>jstorm-server</artifactId>
-			<version>${jstorm.version}</version>
-			<scope>provided</scope>
-		</dependency>
-
-		<dependency>
-			<groupId>junit</groupId>
-			<artifactId>junit</artifactId>
-			<version>4.4</version>
-			<scope>test</scope>
-		</dependency>
-		<dependency>
-			<groupId>com.taobao.metaq.final</groupId>
-			<artifactId>metaq-client</artifactId>
-			<version>3.1.8</version>
-		</dependency>
-		<!-- 
-		<dependency>
-			<groupId>com.alibaba.rocketmq</groupId>
-			<artifactId>rocketmq-common</artifactId>
-			<version>3.0.1</version>
-		</dependency>
-		<dependency>
-			<groupId>com.alibaba.rocketmq</groupId>
-			<artifactId>rocketmq-client</artifactId>
-			<version>3.0.1</version>
-		</dependency>
-		<dependency>
-			<groupId>com.alibaba.rocketmq</groupId>
-			<artifactId>rocketmq-remoting</artifactId>
-			<version>3.0.1</version>
-		</dependency>
-		 -->
-
-	</dependencies>
-
-</project>

http://git-wip-us.apache.org/repos/asf/storm/blob/e8f64d5e/jstorm-utility/jstorm-rocket-mq/src/main/java/com/alibaba/aloha/meta/MetaClientConfig.java
----------------------------------------------------------------------
diff --git a/jstorm-utility/jstorm-rocket-mq/src/main/java/com/alibaba/aloha/meta/MetaClientConfig.java b/jstorm-utility/jstorm-rocket-mq/src/main/java/com/alibaba/aloha/meta/MetaClientConfig.java
deleted file mode 100644
index f8a9c9c..0000000
--- a/jstorm-utility/jstorm-rocket-mq/src/main/java/com/alibaba/aloha/meta/MetaClientConfig.java
+++ /dev/null
@@ -1,263 +0,0 @@
-package com.alibaba.aloha.meta;
-
-import java.io.Serializable;
-import java.util.Date;
-import java.util.Map;
-import java.util.Properties;
-
-import org.apache.commons.lang.StringUtils;
-import org.apache.commons.lang.builder.ToStringBuilder;
-import org.apache.commons.lang.builder.ToStringStyle;
-
-import com.alibaba.jstorm.utils.JStormUtils;
-import com.alibaba.jstorm.utils.TimeFormat;
-
-/**
- * Meta Spout Setting
- * 
- * All needed configs must prepare before submit topopoly
- * 
- * @author zhongyan.feng/zhiyuan.ls
- */
-public class MetaClientConfig implements Serializable {
-
-	private static final long serialVersionUID = 4157424979688593280L;
-
-	public static final String META_TOPIC = "meta.topic";
-	public static final String META_CONSUMER_GROUP = "meta.consumer.group";
-	public static final String META_SUBEXPRESS = "meta.subexpress";
-	public static final String META_NAMESERVER = "meta.nameserver";
-	//pull interval(ms) from meta server 
-	public static final String META_PULL_INTERVAL = "meta.pull.interval.ms";
-	// max fail times
-	public static final String META_MAX_FAIL_TIMES = "meta.max.fail.times";
-	// meta client internal queue size
-	public static final String META_INTERNAL_QUEUE_SIZE = "meta.internal.queue.size";
-	// spout send one batch size
-	public static final String META_BATCH_SEND_MSG_SIZE = "meta.batch.send.msg.size";
-	// meta client pull batch size from meta server
-	public static final String META_BATCH_PULL_MSG_SIZE = "meta.batch.pull.msg.size";
-	// meta client pull thread num
-	public static final String META_PULL_THREAD_NUM = "meta.pull.thread.num";
-	// meta message automatically ack
-	public static final String META_SPOUT_AUTO_ACK = "meta.spout.auto.ack";
-	// enable meta spout flow control
-	public static final String META_SPOUT_FLOW_CONTROL= "meta.spout.flow.control";
-	
-	// format is "yyyyMMddHHmmss"
-	// set the meta client offset from this timestamp
-	public static final String META_CONSUMER_START_TIMESTAMP = "meta.consumer.start.timestamp";
-	public static final String META_EXTRA_PROPERTIES = "meta.extra.properties";
-
-
-	private final String consumerGroup;
-
-	/**
-	 * Alipay need set nameServer, taobao don't need set this field
-	 */
-	private final String nameServer;
-
-	private final String topic;
-
-	private final String subExpress;
-
-	/**
-	 * The max allowed failures for one single message, skip the failure message
-	 * if excesses
-	 * 
-	 * -1 means try again until success
-	 */
-	private int maxFailTimes = DEFAULT_FAIL_TIME;
-	public static final int DEFAULT_FAIL_TIME = 5;
-
-	/**
-	 * Local messages threshold, trigger flow control if excesses
-	 * 
-	 */
-	private int queueSize = DEFAULT_QUEUE_SIZE;
-	public static final int DEFAULT_QUEUE_SIZE = 256;
-
-	/**
-	 * fetch messages size from local queue
-	 * it is also sending batch size
-	 * 
-	 */
-	private int sendBatchSize = DEFAULT_BATCH_MSG_NUM;
-	public static final int DEFAULT_BATCH_MSG_NUM = 32;
-
-	/**
-	 * pull message size from meta server 
-	 * 
-	 */
-	private int pullBatchSize = DEFAULT_BATCH_MSG_NUM;
-
-	/**
-	 * pull interval(ms) from server for every batch
-	 * 
-	 */
-	private long pullInterval = 0;
-
-	/**
-	 * pull threads num
-	 */
-	private int pullThreadNum = DEFAULT_PULL_THREAD_NUM;
-	public static int DEFAULT_PULL_THREAD_NUM = 4;
-
-	/**
-	 * Consumer start time Null means start from the last consumption
-	 * time(CONSUME_FROM_LAST_OFFSET)
-	 * 
-	 */
-	private Date startTimeStamp;
-
-	private Properties peroperties;
-
-	protected MetaClientConfig(String consumerGroup, String nameServer,
-			String topic, String subExpress) {
-		this.consumerGroup = consumerGroup;
-		this.nameServer = nameServer;
-		this.topic = topic;
-		this.subExpress = subExpress;
-	}
-	
-	public MetaClientConfig(Map conf) {
-		topic = (String) conf.get(META_TOPIC);
-		consumerGroup = (String) conf.get(META_CONSUMER_GROUP);
-		subExpress = (String) conf.get(META_SUBEXPRESS);
-		if (StringUtils.isBlank((String) conf.get(META_NAMESERVER)) == false) {
-			nameServer = (String) conf.get(META_NAMESERVER);
-		}else {
-			nameServer = null;
-		}
-		
-		maxFailTimes = JStormUtils.parseInt(conf.get(META_MAX_FAIL_TIMES), 
-				DEFAULT_FAIL_TIME);
-		
-		queueSize = JStormUtils.parseInt(conf.get(META_INTERNAL_QUEUE_SIZE), 
-				DEFAULT_QUEUE_SIZE);
-		
-		sendBatchSize = JStormUtils.parseInt(conf.get(META_BATCH_SEND_MSG_SIZE),
-				DEFAULT_BATCH_MSG_NUM);
-		
-		pullBatchSize = JStormUtils.parseInt(conf.get(META_BATCH_PULL_MSG_SIZE),
-				DEFAULT_BATCH_MSG_NUM);
-		
-		pullInterval = JStormUtils.parseInt(conf.get(META_PULL_INTERVAL), 0);
-		
-		pullThreadNum = JStormUtils.parseInt(conf.get(META_PULL_THREAD_NUM), 
-				DEFAULT_PULL_THREAD_NUM);
-		
-		String ts = (String)conf.get(META_CONSUMER_START_TIMESTAMP);
-		if (ts != null) {
-			Date date = null;
-			try {
-				date = TimeFormat.getSecond(ts);
-			}catch(Exception e) {
-				
-			}
-			
-			if (date != null) {
-				startTimeStamp = date;
-			}
-		}
-		
-		Object prop = conf.get(META_EXTRA_PROPERTIES);
-		if (prop != null && prop instanceof Properties) {
-			peroperties = (Properties)prop;
-		}
-	}
-
-	public static MetaClientConfig mkInstance(Map conf) {
-
-		return new MetaClientConfig(conf);
-	}
-
-	
-
-	public int getMaxFailTimes() {
-		return maxFailTimes;
-	}
-
-	public void setMaxFailTimes(int maxFailTimes) {
-		this.maxFailTimes = maxFailTimes;
-	}
-
-	public int getQueueSize() {
-		return queueSize;
-	}
-
-	public void setQueueSize(int queueSize) {
-		this.queueSize = queueSize;
-	}
-
-	public int getSendBatchSize() {
-		return sendBatchSize;
-	}
-
-	public void setSendBatchSize(int sendBatchSize) {
-		this.sendBatchSize = sendBatchSize;
-	}
-
-	public int getPullBatchSize() {
-		return pullBatchSize;
-	}
-
-	public void setPullBatchSize(int pullBatchSize) {
-		this.pullBatchSize = pullBatchSize;
-	}
-
-	public long getPullInterval() {
-		return pullInterval;
-	}
-
-	public void setPullInterval(long pullInterval) {
-		this.pullInterval = pullInterval;
-	}
-
-	public int getPullThreadNum() {
-		return pullThreadNum;
-	}
-
-	public void setPullThreadNum(int pullThreadNum) {
-		this.pullThreadNum = pullThreadNum;
-	}
-
-	public Date getStartTimeStamp() {
-		return startTimeStamp;
-	}
-
-	public void setStartTimeStamp(Date startTimeStamp) {
-		this.startTimeStamp = startTimeStamp;
-	}
-
-	public Properties getPeroperties() {
-		return peroperties;
-	}
-
-	public void setPeroperties(Properties peroperties) {
-		this.peroperties = peroperties;
-	}
-
-	public String getConsumerGroup() {
-		return consumerGroup;
-	}
-
-	public String getNameServer() {
-		return nameServer;
-	}
-
-	public String getTopic() {
-		return topic;
-	}
-
-	public String getSubExpress() {
-		return subExpress;
-	}
-
-	@Override
-	public String toString() {
-		return ToStringBuilder.reflectionToString(this,
-				ToStringStyle.SHORT_PREFIX_STYLE);
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e8f64d5e/jstorm-utility/jstorm-rocket-mq/src/main/java/com/alibaba/aloha/meta/MetaConsumerFactory.java
----------------------------------------------------------------------
diff --git a/jstorm-utility/jstorm-rocket-mq/src/main/java/com/alibaba/aloha/meta/MetaConsumerFactory.java b/jstorm-utility/jstorm-rocket-mq/src/main/java/com/alibaba/aloha/meta/MetaConsumerFactory.java
deleted file mode 100644
index 52db507..0000000
--- a/jstorm-utility/jstorm-rocket-mq/src/main/java/com/alibaba/aloha/meta/MetaConsumerFactory.java
+++ /dev/null
@@ -1,109 +0,0 @@
-package com.alibaba.aloha.meta;
-
-import java.util.Date;
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.log4j.Logger;
-
-import com.alibaba.jstorm.utils.JStormUtils;
-import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
-import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;
-import com.alibaba.rocketmq.common.protocol.heartbeat.MessageModel;
-import com.taobao.metaq.client.MetaHelper;
-import com.taobao.metaq.client.MetaPushConsumer;
-
-public class MetaConsumerFactory {
-	
-	private static final Logger	LOG   = Logger.getLogger(MetaConsumerFactory.class);
-	
-    
-    private static final long                 serialVersionUID = 4641537253577312163L;
-    
-    public static Map<String, MetaPushConsumer> consumers = 
-    		new HashMap<String, MetaPushConsumer>();
-    
-    public static synchronized MetaPushConsumer mkInstance(MetaClientConfig config, 
-			MessageListenerConcurrently listener)  throws Exception{
-    	
-    	String topic = config.getTopic();
-    	String groupId = config.getConsumerGroup();
-    	
-    	String key = topic + "@" + groupId;
-    	
-    	MetaPushConsumer consumer = consumers.get(key);
-    	if (consumer != null) {
-    		
-    		LOG.info("Consumer of " + key + " has been created, don't recreate it ");
-    		
-    		//Attention, this place return null to info duplicated consumer
-    		return null;
-    	}
-    	
-        
-        StringBuilder sb = new StringBuilder();
-        sb.append("Begin to init meta client \n");
-        sb.append(",configuration:").append(config);
-        
-        LOG.info(sb.toString());
-        
-        consumer = new MetaPushConsumer(config.getConsumerGroup());
-        
-        String nameServer = config.getNameServer();
-        if ( nameServer != null) {
-			String namekey = "rocketmq.namesrv.domain";
-
-			String value = System.getProperty(namekey);
-			// this is for alipay
-			if (value == null) {
-
-				System.setProperty(namekey, nameServer);
-			} else if (value.equals(nameServer) == false) {
-				throw new Exception(
-						"Different nameserver address in the same worker "
-								+ value + ":" + nameServer);
-
-			}
-		}
-        
-        String instanceName = groupId +"@" +	JStormUtils.process_pid();
-		consumer.setInstanceName(instanceName);
-		consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
-		consumer.subscribe(config.getTopic(), config.getSubExpress());
-		consumer.registerMessageListener(listener);
-		
-		consumer.setPullThresholdForQueue(config.getQueueSize());
-		consumer.setConsumeMessageBatchMaxSize(config.getSendBatchSize());
-		consumer.setPullBatchSize(config.getPullBatchSize());
-		consumer.setPullInterval(config.getPullInterval());
-		consumer.setConsumeThreadMin(config.getPullThreadNum());
-		consumer.setConsumeThreadMax(config.getPullThreadNum());
-
-
-		Date date = config.getStartTimeStamp() ;
-		if ( date != null) {
-			LOG.info("Begin to reset meta offset to " + date);
-			try {
-				MetaHelper.resetOffsetByTimestamp(MessageModel.CLUSTERING,
-					instanceName, config.getConsumerGroup(),
-					config.getTopic(), date.getTime());
-				LOG.info("Successfully reset meta offset to " + date);
-			}catch(Exception e) {
-				LOG.error("Failed to reset meta offset to " + date);
-			}
-
-		}else {
-			LOG.info("Don't reset meta offset  ");
-		}
-
-		consumer.start();
-		
-		consumers.put(key, consumer);
-		LOG.info("Successfully create " + key + " consumer");
-		
-		
-		return consumer;
-		
-    }
-    
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e8f64d5e/jstorm-utility/jstorm-rocket-mq/src/main/java/com/alibaba/aloha/meta/MetaSpout.java
----------------------------------------------------------------------
diff --git a/jstorm-utility/jstorm-rocket-mq/src/main/java/com/alibaba/aloha/meta/MetaSpout.java b/jstorm-utility/jstorm-rocket-mq/src/main/java/com/alibaba/aloha/meta/MetaSpout.java
deleted file mode 100644
index e6c3a26..0000000
--- a/jstorm-utility/jstorm-rocket-mq/src/main/java/com/alibaba/aloha/meta/MetaSpout.java
+++ /dev/null
@@ -1,248 +0,0 @@
-package com.alibaba.aloha.meta;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.LinkedBlockingDeque;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.apache.log4j.Logger;
-
-import backtype.storm.spout.SpoutOutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.IRichSpout;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.tuple.Fields;
-import backtype.storm.tuple.Values;
-
-import com.alibaba.jstorm.client.metric.MetricClient;
-import com.alibaba.jstorm.client.spout.IAckValueSpout;
-import com.alibaba.jstorm.client.spout.IFailValueSpout;
-import com.alibaba.jstorm.metric.JStormHistogram;
-import com.alibaba.jstorm.utils.JStormUtils;
-import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
-import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
-import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
-import com.alibaba.rocketmq.common.message.MessageExt;
-import com.taobao.metaq.client.MetaPushConsumer;
-
-public class MetaSpout implements IRichSpout, IAckValueSpout, IFailValueSpout,
-		MessageListenerConcurrently {
-	/**  */
-	private static final long serialVersionUID = 8476906628618859716L;
-	private static final Logger LOG = Logger.getLogger(MetaSpout.class);
-
-	protected MetaClientConfig metaClientConfig;
-	protected SpoutOutputCollector collector;
-	protected transient MetaPushConsumer consumer;
-
-	protected Map conf;
-	protected String id;
-	protected boolean flowControl;
-	protected boolean autoAck;
-
-	protected transient LinkedBlockingDeque<MetaTuple> sendingQueue;
-
-	protected transient MetricClient metricClient;
-	protected transient JStormHistogram waithHistogram;
-	protected transient JStormHistogram processHistogram;
-
-	public MetaSpout() {
-
-	}
-
-	public void initMetricClient(TopologyContext context) {
-		metricClient = new MetricClient(context);
-		waithHistogram = metricClient.registerHistogram("MetaTupleWait", null);
-		processHistogram = metricClient.registerHistogram("MetaTupleProcess",
-				null);
-	}
-
-	@Override
-	public void open(Map conf, TopologyContext context,
-			SpoutOutputCollector collector) {
-		this.conf = conf;
-		this.collector = collector;
-		this.id = context.getThisComponentId() + ":" + context.getThisTaskId();
-		this.sendingQueue = new LinkedBlockingDeque<MetaTuple>();
-
-		this.flowControl = JStormUtils.parseBoolean(
-				conf.get(MetaClientConfig.META_SPOUT_FLOW_CONTROL), true);
-		this.autoAck = JStormUtils.parseBoolean(
-				conf.get(MetaClientConfig.META_SPOUT_AUTO_ACK), false);
-		
-		StringBuilder sb = new StringBuilder();
-		sb.append("Begin to init MetaSpout:").append(id);
-		sb.append(", flowControl:").append(flowControl);
-		sb.append(", autoAck:").append(autoAck);
-		LOG.info( sb.toString());
-
-		initMetricClient(context);
-
-		metaClientConfig = MetaClientConfig.mkInstance(conf);
-
-		try {
-			consumer = MetaConsumerFactory.mkInstance(metaClientConfig, this);
-		} catch (Exception e) {
-			LOG.error("Failed to create Meta Consumer ", e);
-			throw new RuntimeException("Failed to create MetaConsumer" + id, e);
-		}
-
-		if (consumer == null) {
-			LOG.warn(id
-					+ " already exist consumer in current worker, don't need to fetch data ");
-
-			new Thread(new Runnable() {
-
-				@Override
-				public void run() {
-					while (true) {
-						try {
-							Thread.sleep(10000);
-						} catch (InterruptedException e) {
-							break;
-						}
-
-						StringBuilder sb = new StringBuilder();
-						sb.append("Only on meta consumer can be run on one process,");
-						sb.append(" but there are mutliple spout consumes with the same topic@groupid meta, so the second one ");
-						sb.append(id).append(" do nothing ");
-						LOG.info(sb.toString());
-					}
-				}
-			}).start();
-		}
-		
-		LOG.info("Successfully init " + id);
-	}
-
-	@Override
-	public void close() {
-		if (consumer != null) {
-			consumer.shutdown();
-		}
-
-	}
-
-	@Override
-	public void activate() {
-		if (consumer != null) {
-			consumer.resume();
-		}
-
-	}
-
-	@Override
-	public void deactivate() {
-		if (consumer != null) {
-			consumer.suspend();
-		}
-	}
-
-	public void sendTuple(MetaTuple metaTuple) {
-		metaTuple.updateEmitMs();
-		collector.emit(new Values(metaTuple), metaTuple.getCreateMs());
-	}
-
-	@Override
-	public void nextTuple() {
-		MetaTuple metaTuple = null;
-		try {
-			metaTuple = sendingQueue.take();
-		} catch (InterruptedException e) {
-		}
-
-		if (metaTuple == null) {
-			return;
-		}
-
-		sendTuple(metaTuple);
-
-	}
-
-	@Deprecated
-	public void ack(Object msgId) {
-		LOG.warn("Shouldn't go this function");
-	}
-
-	@Deprecated
-	public void fail(Object msgId) {
-		LOG.warn("Shouldn't go this function");
-	}
-
-	@Override
-	public void declareOutputFields(OutputFieldsDeclarer declarer) {
-		declarer.declare(new Fields("MetaTuple"));
-	}
-
-	@Override
-	public Map<String, Object> getComponentConfiguration() {
-		return null;
-	}
-
-	@Override
-	public void fail(Object msgId, List<Object> values) {
-		MetaTuple metaTuple = (MetaTuple) values.get(0);
-		AtomicInteger failTimes = metaTuple.getFailureTimes();
-
-		int failNum = failTimes.incrementAndGet();
-		if (failNum > metaClientConfig.getMaxFailTimes()) {
-			LOG.warn("Message " + metaTuple.getMq() + " fail times " + failNum);
-			finishTuple(metaTuple);
-			return;
-		}
-
-		if (flowControl) {
-			sendingQueue.offer(metaTuple);
-		} else {
-			sendTuple(metaTuple);
-		}
-	}
-
-	public void finishTuple(MetaTuple metaTuple) {
-		waithHistogram.update(metaTuple.getEmitMs() - metaTuple.getCreateMs());
-		processHistogram.update(System.currentTimeMillis() - metaTuple.getEmitMs());
-		metaTuple.done();
-	}
-
-	@Override
-	public void ack(Object msgId, List<Object> values) {
-		MetaTuple metaTuple = (MetaTuple) values.get(0);
-		finishTuple(metaTuple);
-	}
-
-	@Override
-	public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
-			ConsumeConcurrentlyContext context) {
-		try {
-			MetaTuple metaTuple = new MetaTuple(msgs, context.getMessageQueue());
-
-			if (flowControl) {
-				sendingQueue.offer(metaTuple);
-			} else {
-				sendTuple(metaTuple);
-			}
-
-			if (autoAck) {
-				return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
-			} else {
-				metaTuple.waitFinish();
-				if (metaTuple.isSuccess() == true) {
-					return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
-				} else {
-					return ConsumeConcurrentlyStatus.RECONSUME_LATER;
-				}
-			}
-
-		} catch (Exception e) {
-			LOG.error("Failed to emit " + id, e);
-			return ConsumeConcurrentlyStatus.RECONSUME_LATER;
-		}
-
-	}
-
-	public MetaPushConsumer getConsumer() {
-		return consumer;
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e8f64d5e/jstorm-utility/jstorm-rocket-mq/src/main/java/com/alibaba/aloha/meta/MetaTuple.java
----------------------------------------------------------------------
diff --git a/jstorm-utility/jstorm-rocket-mq/src/main/java/com/alibaba/aloha/meta/MetaTuple.java b/jstorm-utility/jstorm-rocket-mq/src/main/java/com/alibaba/aloha/meta/MetaTuple.java
deleted file mode 100644
index d735749..0000000
--- a/jstorm-utility/jstorm-rocket-mq/src/main/java/com/alibaba/aloha/meta/MetaTuple.java
+++ /dev/null
@@ -1,90 +0,0 @@
-package com.alibaba.aloha.meta;
-
-import java.io.Serializable;
-import java.util.List;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.apache.commons.lang.builder.ToStringBuilder;
-import org.apache.commons.lang.builder.ToStringStyle;
-
-import com.alibaba.rocketmq.common.message.MessageExt;
-import com.alibaba.rocketmq.common.message.MessageQueue;
-
-public class MetaTuple implements Serializable {
-
-	/**  */
-	private static final long serialVersionUID = 2277714452693486955L;
-
-	protected final List<MessageExt> msgList;
-	protected final MessageQueue mq;
-
-	protected final AtomicInteger failureTimes;
-	protected final long createMs;
-	protected long emitMs;
-
-	protected transient CountDownLatch latch;
-	protected transient boolean isSuccess;
-
-	public MetaTuple(List<MessageExt> msgList, MessageQueue mq) {
-		this.msgList = msgList;
-		this.mq = mq;
-
-		this.failureTimes = new AtomicInteger(0);
-		this.createMs = System.currentTimeMillis();
-
-		this.latch = new CountDownLatch(1);
-		this.isSuccess = false;
-	}
-
-	public AtomicInteger getFailureTimes() {
-		return failureTimes;
-	}
-	
-	public long getCreateMs() {
-		return createMs;
-	}
-
-	public long getEmitMs() {
-		return emitMs;
-	}
-
-	public void updateEmitMs() {
-		this.emitMs = System.currentTimeMillis();
-	}
-
-	public List<MessageExt> getMsgList() {
-		return msgList;
-	}
-	
-
-	public MessageQueue getMq() {
-		return mq;
-	}
-
-	public boolean waitFinish() throws InterruptedException {
-		return latch.await(4, TimeUnit.HOURS);
-	}
-
-	public void done() {
-		isSuccess = true;
-		latch.countDown();
-	}
-
-	public void fail() {
-		isSuccess = false;
-		latch.countDown();
-	}
-
-	public boolean isSuccess() {
-		return isSuccess;
-	}
-	
-
-	@Override
-	public String toString() {
-		return ToStringBuilder.reflectionToString(this,
-				ToStringStyle.SHORT_PREFIX_STYLE);
-	}
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e8f64d5e/jstorm-utility/jstorm-rocket-mq/src/main/java/com/alibaba/aloha/meta/example/TestTopology.java
----------------------------------------------------------------------
diff --git a/jstorm-utility/jstorm-rocket-mq/src/main/java/com/alibaba/aloha/meta/example/TestTopology.java b/jstorm-utility/jstorm-rocket-mq/src/main/java/com/alibaba/aloha/meta/example/TestTopology.java
deleted file mode 100644
index 608b54c..0000000
--- a/jstorm-utility/jstorm-rocket-mq/src/main/java/com/alibaba/aloha/meta/example/TestTopology.java
+++ /dev/null
@@ -1,150 +0,0 @@
-package com.alibaba.aloha.meta.example;
-
-import java.io.FileInputStream;
-import java.io.FileNotFoundException;
-import java.io.InputStream;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Properties;
-
-import org.apache.log4j.Logger;
-import org.yaml.snakeyaml.Yaml;
-
-import backtype.storm.Config;
-import backtype.storm.LocalCluster;
-import backtype.storm.StormSubmitter;
-import backtype.storm.topology.TopologyBuilder;
-
-import com.alibaba.aloha.meta.MetaSpout;
-import com.alibaba.jstorm.utils.JStormUtils;
-
-/**
- * MonitorTopology
- * 
- * @author longda/zhiyuan.ls
- * 
- */
-public class TestTopology {
-
-	private static Logger LOG = Logger.getLogger(TestTopology.class);
-
-	public static String WRITER_COMPONENT = "writer";
-
-	public static void main(String[] args) throws Exception {
-		if (args.length == 0) {
-			System.err.println("Please input configuration file");
-			System.exit(-1);
-		}
-
-		LoadConf(args[0]);
-
-		TopologyBuilder builder = setupBuilder();
-
-		submitTopology(builder);
-
-	}
-
-	private static TopologyBuilder setupBuilder() throws Exception {
-		TopologyBuilder builder = new TopologyBuilder();
-
-		int writerParallel = JStormUtils.parseInt(
-				conf.get("topology.writer.parallel"), 1);
-
-		int spoutParallel = JStormUtils.parseInt(
-				conf.get("topology.spout.parallel"), 1);
-
-		builder.setSpout("MetaSpout", new MetaSpout(), spoutParallel);
-
-		builder.setBolt(WRITER_COMPONENT, new WriterBolt(), writerParallel)
-				.shuffleGrouping("MetaSpout");
-
-		return builder;
-	}
-
-	private static void submitTopology(TopologyBuilder builder) {
-		try {
-			if (local_mode(conf)) {
-
-				LocalCluster cluster = new LocalCluster();
-
-				cluster.submitTopology(
-						String.valueOf(conf.get("topology.name")), conf,
-						builder.createTopology());
-
-				Thread.sleep(200000);
-
-				cluster.shutdown();
-			} else {
-				StormSubmitter.submitTopology(
-						String.valueOf(conf.get("topology.name")), conf,
-						builder.createTopology());
-			}
-
-		} catch (Exception e) {
-			LOG.error(e.getMessage(), e.getCause());
-		}
-	}
-
-	private static Map conf = new HashMap<Object, Object>();
-
-	private static void LoadProperty(String prop) {
-		Properties properties = new Properties();
-
-		try {
-			InputStream stream = new FileInputStream(prop);
-			properties.load(stream);
-		} catch (FileNotFoundException e) {
-			System.out.println("No such file " + prop);
-		} catch (Exception e1) {
-			e1.printStackTrace();
-
-			return;
-		}
-
-		conf.putAll(properties);
-	}
-
-	private static void LoadYaml(String confPath) {
-
-		Yaml yaml = new Yaml();
-
-		try {
-			InputStream stream = new FileInputStream(confPath);
-
-			conf = (Map) yaml.load(stream);
-			if (conf == null || conf.isEmpty() == true) {
-				throw new RuntimeException("Failed to read config file");
-			}
-
-		} catch (FileNotFoundException e) {
-			System.out.println("No such file " + confPath);
-			throw new RuntimeException("No config file");
-		} catch (Exception e1) {
-			e1.printStackTrace();
-			throw new RuntimeException("Failed to read config file");
-		}
-
-		return;
-	}
-
-	private static void LoadConf(String arg) {
-		if (arg.endsWith("yaml")) {
-			LoadYaml(arg);
-		} else {
-			LoadProperty(arg);
-		}
-	}
-
-	public static boolean local_mode(Map conf) {
-		String mode = (String) conf.get(Config.STORM_CLUSTER_MODE);
-		if (mode != null) {
-			if (mode.equals("local")) {
-				return true;
-			}
-		}
-
-		return false;
-
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e8f64d5e/jstorm-utility/jstorm-rocket-mq/src/main/java/com/alibaba/aloha/meta/example/WriterBolt.java
----------------------------------------------------------------------
diff --git a/jstorm-utility/jstorm-rocket-mq/src/main/java/com/alibaba/aloha/meta/example/WriterBolt.java b/jstorm-utility/jstorm-rocket-mq/src/main/java/com/alibaba/aloha/meta/example/WriterBolt.java
deleted file mode 100644
index 5eddef9..0000000
--- a/jstorm-utility/jstorm-rocket-mq/src/main/java/com/alibaba/aloha/meta/example/WriterBolt.java
+++ /dev/null
@@ -1,59 +0,0 @@
-package com.alibaba.aloha.meta.example;
-
-import java.util.Map;
-
-import org.apache.log4j.Logger;
-
-import backtype.storm.task.OutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.IRichBolt;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.tuple.Tuple;
-
-import com.alibaba.aloha.meta.MetaTuple;
-
-
-public class WriterBolt implements IRichBolt {
-
-    private static final long serialVersionUID = 2495121976857546346L;
-    
-    private static final Logger LOG              = Logger.getLogger(WriterBolt.class);
-
-    protected OutputCollector      collector;
-    
-    public void prepare(Map stormConf, TopologyContext context,
-            OutputCollector collector) {
-        this.collector = collector;
-        
-    }
-    
-    public void execute(Tuple tuple) {
-        // TODO Auto-generated method stub
-        MetaTuple metaTuple = (MetaTuple)tuple.getValue(0);
-        
-        try {
-            LOG.info("Messages:" + metaTuple);
-            
-        } catch (Exception e) {
-            collector.fail(tuple);
-            return ;
-            //throw new FailedException(e);
-        }
-        
-        collector.ack(tuple);
-    }
-    
-    public void cleanup() {
-        // TODO Auto-generated method stub
-    }
-    
-    public void declareOutputFields(OutputFieldsDeclarer declarer) {
-        // TODO Auto-generated method stub
-        
-    }
-    
-    public Map<String, Object> getComponentConfiguration() {
-        // TODO Auto-generated method stub
-        return null;
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e8f64d5e/jstorm-utility/jstorm-rocket-mq/target/classes/META-INF/MANIFEST.MF
----------------------------------------------------------------------
diff --git a/jstorm-utility/jstorm-rocket-mq/target/classes/META-INF/MANIFEST.MF b/jstorm-utility/jstorm-rocket-mq/target/classes/META-INF/MANIFEST.MF
deleted file mode 100644
index 458dd09..0000000
--- a/jstorm-utility/jstorm-rocket-mq/target/classes/META-INF/MANIFEST.MF
+++ /dev/null
@@ -1,5 +0,0 @@
-Manifest-Version: 1.0
-Built-By: basti.lj
-Build-Jdk: 1.6.0_45
-Created-By: Maven Integration for Eclipse
-

http://git-wip-us.apache.org/repos/asf/storm/blob/e8f64d5e/jstorm-utility/jstorm-rocket-mq/target/classes/META-INF/maven/com.alibaba.jstorm/metaspout/pom.properties
----------------------------------------------------------------------
diff --git a/jstorm-utility/jstorm-rocket-mq/target/classes/META-INF/maven/com.alibaba.jstorm/metaspout/pom.properties b/jstorm-utility/jstorm-rocket-mq/target/classes/META-INF/maven/com.alibaba.jstorm/metaspout/pom.properties
deleted file mode 100644
index ad01a91..0000000
--- a/jstorm-utility/jstorm-rocket-mq/target/classes/META-INF/maven/com.alibaba.jstorm/metaspout/pom.properties
+++ /dev/null
@@ -1,7 +0,0 @@
-#Generated by Maven Integration for Eclipse
-#Mon Nov 03 15:24:06 CST 2014
-version=0.2.0-SNAPSHOT
-groupId=com.alibaba.jstorm
-m2e.projectName=metaspout
-m2e.projectLocation=D\:\\code\\aloha_branch\\github_master\\jstorm\\jstorm-utility\\jstorm-rocket-mq
-artifactId=metaspout

http://git-wip-us.apache.org/repos/asf/storm/blob/e8f64d5e/jstorm-utility/jstorm-rocket-mq/target/classes/META-INF/maven/com.alibaba.jstorm/metaspout/pom.xml
----------------------------------------------------------------------
diff --git a/jstorm-utility/jstorm-rocket-mq/target/classes/META-INF/maven/com.alibaba.jstorm/metaspout/pom.xml b/jstorm-utility/jstorm-rocket-mq/target/classes/META-INF/maven/com.alibaba.jstorm/metaspout/pom.xml
deleted file mode 100644
index ffc611e..0000000
--- a/jstorm-utility/jstorm-rocket-mq/target/classes/META-INF/maven/com.alibaba.jstorm/metaspout/pom.xml
+++ /dev/null
@@ -1,94 +0,0 @@
-<?xml version="1.0"?>
-<project xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd" xmlns="http://maven.apache.org/POM/4.0.0"
-    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
-  <modelVersion>4.0.0</modelVersion>
-    <groupId>com.alibaba.jstorm</groupId>
-    <artifactId>metaspout</artifactId>
-    <version>0.2.0-SNAPSHOT</version>
-
-	<properties>
-		<jstorm.version>0.9.6.1</jstorm.version>
-	</properties>
-	
-	<build>
-		<plugins>
-			<plugin>
-				<artifactId>maven-assembly-plugin</artifactId>
-				<configuration>
-					<descriptorRefs>
-						<descriptorRef>jar-with-dependencies</descriptorRef>
-					</descriptorRefs>
-				</configuration>
-				<executions>
-					<execution>
-						<id>make-assembly</id>
-						<phase>package</phase>
-						<goals>
-							<goal>single</goal>
-						</goals>
-					</execution>
-				</executions>
-			</plugin>
-			<plugin>
-				<groupId>org.apache.maven.plugins</groupId>
-				<artifactId>maven-compiler-plugin</artifactId>
-				<configuration>
-					<source>1.6</source>
-					<target>1.6</target>
-				</configuration>
-			</plugin>
-		</plugins>
-	</build>
-
-	<dependencies>
-	    <dependency>
-			<groupId>com.alibaba.jstorm</groupId>
-			<artifactId>jstorm-client</artifactId>
-			<version>${jstorm.version}</version>
-			<scope>provided</scope>
-		</dependency>
-		<dependency>
-			<groupId>com.alibaba.jstorm</groupId>
-			<artifactId>jstorm-client-extension</artifactId>
-			<version>${jstorm.version}</version>
-			<scope>provided</scope>
-		</dependency>
-		<dependency>
-			<groupId>com.alibaba.jstorm</groupId>
-			<artifactId>jstorm-server</artifactId>
-			<version>${jstorm.version}</version>
-			<scope>provided</scope>
-		</dependency>
-
-		<dependency>
-			<groupId>junit</groupId>
-			<artifactId>junit</artifactId>
-			<version>4.4</version>
-			<scope>test</scope>
-		</dependency>
-		<dependency>
-			<groupId>com.taobao.metaq.final</groupId>
-			<artifactId>metaq-client</artifactId>
-			<version>3.1.8</version>
-		</dependency>
-		<!-- 
-		<dependency>
-			<groupId>com.alibaba.rocketmq</groupId>
-			<artifactId>rocketmq-common</artifactId>
-			<version>3.0.1</version>
-		</dependency>
-		<dependency>
-			<groupId>com.alibaba.rocketmq</groupId>
-			<artifactId>rocketmq-client</artifactId>
-			<version>3.0.1</version>
-		</dependency>
-		<dependency>
-			<groupId>com.alibaba.rocketmq</groupId>
-			<artifactId>rocketmq-remoting</artifactId>
-			<version>3.0.1</version>
-		</dependency>
-		 -->
-
-	</dependencies>
-
-</project>

http://git-wip-us.apache.org/repos/asf/storm/blob/e8f64d5e/jstorm-utility/jstorm-rocket-mq/test/main/resources/metaspout.yaml
----------------------------------------------------------------------
diff --git a/jstorm-utility/jstorm-rocket-mq/test/main/resources/metaspout.yaml b/jstorm-utility/jstorm-rocket-mq/test/main/resources/metaspout.yaml
deleted file mode 100644
index f007772..0000000
--- a/jstorm-utility/jstorm-rocket-mq/test/main/resources/metaspout.yaml
+++ /dev/null
@@ -1,32 +0,0 @@
-
-#Meta Client Configuration
-# Please refer MetaClientConfig for every setting's details
-meta.topic: "bbl_user"
-meta.consumer.group: "bbl_user"
-meta.subexpress: "*"
-#meta.nameserver: ""
-#meta.pull.interval.ms: 0
-#meta.max.fail.times: 5
-#meta.internal.queue.size: 256
-#meta.batch.send.msg.size: 16
-#meta.batch.pull.msg.size: 32
-#meta.pull.thread.num: 4
-#meta.spout.auto.ack: false
-#meta.spout.flow.contro: true
-#yyyyMMddHHmmss
-meta.consumer.start.timestamp: "20141011000000"
-#meta.extra.properties: 
-
-topology.name: test_meta_spout
-topology.version: 1.0.0
-topology.workers: 5
-topology.max.spout.pending: 10
-topology.acker.executors: 1
-
-topology.debug: false
-topology.debug.recv.tuple: false
-storm.cluster.mode: local
-
-topology.spout.parallel: 2
-topology.writer.parallel: 1
-

http://git-wip-us.apache.org/repos/asf/storm/blob/e8f64d5e/jstorm-utility/ons/conf/ons.yaml
----------------------------------------------------------------------
diff --git a/jstorm-utility/ons/conf/ons.yaml b/jstorm-utility/ons/conf/ons.yaml
deleted file mode 100644
index 5736422..0000000
--- a/jstorm-utility/ons/conf/ons.yaml
+++ /dev/null
@@ -1,49 +0,0 @@
-#############################################################
-###################### ONS Setting Begin ####################
-Topic: "longdatest"
-SubExpress: "*"
-AccessKey: null
-SecretKey: null
-
-ConsumerId: "CID-LONGDA-123"
-ConsumeThreadNums: 4
-ProducerId: "PID_25770293805-101"
-
-#SendMsgTimeoutMillis: 
-#MessageModel:
-#ONSAddr:
-#NAMESRV_ADDR:
-
-###################### ONS Setting End ######################
-#############################################################
-
-
-#############################################################
-############### JStorm Topology Setting Begin ###############
-
-#ons spout enable flow control setting
-# all message will be sent in Spout.nextTuple
-OnsSpoutFlowControl: true
-
-# spout consume one with autoAck mode
-# if disable, consumer offset won't move on until do spout.ack 
-OnsSpoutAutoAck: false
-
-# if one message fail times is bigger than the OnsMsgMaxFailTimes
-# it will be thrown
-OnsMsgMaxFailTimes: 5
-
-topology.name: "ons_test"
-topology.consumer.parallel: 1
-topology.producer.parallel: 1
-worker.memory.size: 2147483648
-topology.workers: 1
-topology.acker.executors: 0
-storm.cluster.mode: "local"
-
-############### JStorm Topology Setting End ###############
-#############################################################
-
-
-    
-