You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by pt...@apache.org on 2015/11/05 21:41:37 UTC
[58/60] [abbrv] storm git commit: remove jstorm-utility directory
remove jstorm-utility directory
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/e8f64d5e
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/e8f64d5e
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/e8f64d5e
Branch: refs/heads/jstorm-import
Commit: e8f64d5e88a8f5c29c1633104d06de0970d57676
Parents: e1f6844
Author: P. Taylor Goetz <pt...@gmail.com>
Authored: Thu Nov 5 15:23:13 2015 -0500
Committer: P. Taylor Goetz <pt...@gmail.com>
Committed: Thu Nov 5 15:23:13 2015 -0500
----------------------------------------------------------------------
.gitmodules | 3 -
jstorm-utility/jstorm-kafka/.gitignore | 1 -
jstorm-utility/jstorm-kafka/README.md | 0
jstorm-utility/jstorm-kafka/pom.xml | 146 --------
.../java/com/alibaba/jstorm/kafka/Host.java | 57 ----
.../com/alibaba/jstorm/kafka/KafkaConsumer.java | 241 -------------
.../alibaba/jstorm/kafka/KafkaMessageId.java | 27 --
.../com/alibaba/jstorm/kafka/KafkaSpout.java | 124 -------
.../alibaba/jstorm/kafka/KafkaSpoutConfig.java | 130 -------
.../alibaba/jstorm/kafka/PartitionConsumer.java | 227 -------------
.../jstorm/kafka/PartitionCoordinator.java | 49 ---
.../java/com/alibaba/jstorm/kafka/ZkState.java | 95 ------
.../alibaba/jstorm/test/kafka/KafkaTest.java | 57 ----
jstorm-utility/jstorm-rocket-mq/pom.xml | 94 -----
.../alibaba/aloha/meta/MetaClientConfig.java | 263 --------------
.../alibaba/aloha/meta/MetaConsumerFactory.java | 109 ------
.../java/com/alibaba/aloha/meta/MetaSpout.java | 248 --------------
.../java/com/alibaba/aloha/meta/MetaTuple.java | 90 -----
.../aloha/meta/example/TestTopology.java | 150 --------
.../alibaba/aloha/meta/example/WriterBolt.java | 59 ----
.../target/classes/META-INF/MANIFEST.MF | 5 -
.../com.alibaba.jstorm/metaspout/pom.properties | 7 -
.../maven/com.alibaba.jstorm/metaspout/pom.xml | 94 -----
.../test/main/resources/metaspout.yaml | 32 --
jstorm-utility/ons/conf/ons.yaml | 49 ---
jstorm-utility/ons/pom.xml | 101 ------
.../java/com/alibaba/jstorm/LoadConfig.java | 67 ----
.../java/com/alibaba/jstorm/TestTopology.java | 80 -----
.../java/com/alibaba/jstorm/ons/OnsConfig.java | 69 ----
.../java/com/alibaba/jstorm/ons/OnsTuple.java | 80 -----
.../jstorm/ons/consumer/ConsumerConfig.java | 65 ----
.../jstorm/ons/consumer/ConsumerFactory.java | 49 ---
.../jstorm/ons/consumer/ConsumerSpout.java | 268 ---------------
.../jstorm/ons/producer/ProducerBolt.java | 94 -----
.../jstorm/ons/producer/ProducerConfig.java | 29 --
.../jstorm/ons/producer/ProducerFactory.java | 59 ----
.../ons/test/main/resources/metaspout.yaml | 32 --
jstorm-utility/rocket-mq | 1 -
jstorm-utility/topology-monitor/.gitignore | 13 -
jstorm-utility/topology-monitor/README.md | 2 -
jstorm-utility/topology-monitor/pom.xml | 110 ------
.../cosmos/BlackholeBlockingQueueSpout.java | 114 -------
.../com/dianping/cosmos/BlackholeSpout.java | 101 ------
.../com/dianping/cosmos/MessageFetcher.java | 50 ---
.../java/com/dianping/cosmos/PumaSpout.java | 194 -----------
.../java/com/dianping/cosmos/RedisSinkBolt.java | 167 ---------
.../main/java/com/dianping/cosmos/Updater.java | 9 -
.../cosmos/metric/CatMetricsConsumer.java | 70 ----
.../dianping/cosmos/monitor/HttpCatClient.java | 57 ----
.../cosmos/monitor/HttpClientService.java | 120 -------
.../dianping/cosmos/monitor/SpoutCounter.java | 24 --
.../cosmos/monitor/TopologyMonitor.java | 90 -----
.../monitor/topology/ClusterInfoBolt.java | 170 ----------
.../monitor/topology/ClusterInfoTopology.java | 18 -
.../com/dianping/cosmos/util/CatClient.java | 19 --
.../com/dianping/cosmos/util/CatMetricUtil.java | 45 ---
.../com/dianping/cosmos/util/Constants.java | 9 -
.../java/com/dianping/cosmos/util/JSONUtil.java | 125 -------
.../com/dianping/cosmos/util/TupleHelpers.java | 33 --
.../transaction_meta_spout/conf/topology.yaml | 21 --
jstorm-utility/transaction_meta_spout/pom.xml | 68 ----
.../batch/example/BatchMetaRebalance.java | 108 ------
.../jstorm/batch/example/BatchMetaSpout.java | 131 -------
.../jstorm/batch/example/BatchMetaTopology.java | 163 ---------
.../alibaba/jstorm/batch/example/CountBolt.java | 84 -----
.../alibaba/jstorm/batch/example/DBBolt.java | 261 --------------
.../jstorm/batch/example/TransformBolt.java | 63 ----
.../jstorm/batch/meta/MetaSimpleClient.java | 340 -------------------
.../jstorm/batch/meta/MetaSpoutConfig.java | 119 -------
.../src/main/resources/metaspout.default.prop | 15 -
70 files changed, 6264 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/e8f64d5e/.gitmodules
----------------------------------------------------------------------
diff --git a/.gitmodules b/.gitmodules
index c68bb15..e69de29 100644
--- a/.gitmodules
+++ b/.gitmodules
@@ -1,3 +0,0 @@
-[submodule "jstorm-utility/rocket-mq"]
- path = jstorm-utility/rocket-mq
- url = https://github.com/rocketmq/rocketmq-storm
http://git-wip-us.apache.org/repos/asf/storm/blob/e8f64d5e/jstorm-utility/jstorm-kafka/.gitignore
----------------------------------------------------------------------
diff --git a/jstorm-utility/jstorm-kafka/.gitignore b/jstorm-utility/jstorm-kafka/.gitignore
deleted file mode 100644
index b83d222..0000000
--- a/jstorm-utility/jstorm-kafka/.gitignore
+++ /dev/null
@@ -1 +0,0 @@
-/target/
http://git-wip-us.apache.org/repos/asf/storm/blob/e8f64d5e/jstorm-utility/jstorm-kafka/README.md
----------------------------------------------------------------------
diff --git a/jstorm-utility/jstorm-kafka/README.md b/jstorm-utility/jstorm-kafka/README.md
deleted file mode 100644
index e69de29..0000000
http://git-wip-us.apache.org/repos/asf/storm/blob/e8f64d5e/jstorm-utility/jstorm-kafka/pom.xml
----------------------------------------------------------------------
diff --git a/jstorm-utility/jstorm-kafka/pom.xml b/jstorm-utility/jstorm-kafka/pom.xml
deleted file mode 100755
index df7c0ea..0000000
--- a/jstorm-utility/jstorm-kafka/pom.xml
+++ /dev/null
@@ -1,146 +0,0 @@
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <parent>
- <groupId>com.alibaba.jstorm</groupId>
- <artifactId>jstorm-all</artifactId>
- <version>0.9.6.2</version>
- <relativePath>../..</relativePath>
- </parent>
-
- <modelVersion>4.0.0</modelVersion>
- <artifactId>jstorm-kafka</artifactId>
- <packaging>jar</packaging>
-
- <name>${project.artifactId}-${project.version}</name>
- <description>jstorm kafka</description>
-
- <url>http://maven.apache.org</url>
-
- <properties>
- <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
- <jstorm.version>${parent.version}</jstorm.version>
- <kafka.version>0.8.1</kafka.version>
- <curator.version>1.3.2</curator.version>
- </properties>
-
- <dependencies>
-
-
- <dependency>
- <groupId>com.alibaba.jstorm</groupId>
- <artifactId>jstorm-client-extension</artifactId>
- <version>${jstorm.version}</version>
- <scope>provided</scope>
- </dependency>
-
-
- <dependency>
- <groupId>com.alibaba.jstorm</groupId>
- <artifactId>jstorm-client</artifactId>
- <version>${jstorm.version}</version>
- <scope>provided</scope>
- </dependency>
-
- <dependency>
- <groupId>com.alibaba.jstorm</groupId>
- <artifactId>jstorm-server</artifactId>
- <version>${jstorm.version}</version>
- <scope>provided</scope>
- </dependency>
-
- <dependency>
- <groupId>org.apache.kafka</groupId>
- <artifactId>kafka_2.9.2</artifactId>
- <version>${kafka.version}</version>
- <exclusions>
- <exclusion>
- <groupId>org.apache.zookeeper</groupId>
- <artifactId>zookeeper</artifactId>
- </exclusion>
- <exclusion>
- <groupId>log4j</groupId>
- <artifactId>log4j</artifactId>
- </exclusion>
- <exclusion>
- <groupId>junit</groupId>
- <artifactId>junit</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
-
- <dependency>
- <groupId>org.apache.zookeeper</groupId>
- <artifactId>zookeeper</artifactId>
- <version>3.4.5</version>
- <exclusions>
- <exclusion>
- <groupId>log4j</groupId>
- <artifactId>log4j</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
-
- <dependency>
- <groupId>com.netflix.curator</groupId>
- <artifactId>curator-framework</artifactId>
- <version>${curator.version}</version>
- <exclusions>
- <exclusion>
- <groupId>log4j</groupId>
- <artifactId>log4j</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-log4j12</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
-
-
-
- <dependency>
- <groupId>com.netflix.curator</groupId>
- <artifactId>curator-recipes</artifactId>
- <version>${curator.version}</version>
- <exclusions>
- <exclusion>
- <groupId>log4j</groupId>
- <artifactId>log4j</artifactId>
- </exclusion>
- </exclusions>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>com.netflix.curator</groupId>
- <artifactId>curator-test</artifactId>
- <version>${curator.version}</version>
- <exclusions>
- <exclusion>
- <groupId>log4j</groupId>
- <artifactId>log4j</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.testng</groupId>
- <artifactId>testng</artifactId>
- </exclusion>
- </exclusions>
- <scope>test</scope>
- </dependency>
-
-
- </dependencies>
-
- <build>
- <plugins>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-compiler-plugin</artifactId>
- <configuration>
- <source>1.6</source>
- <target>1.6</target>
- <encoding>${project.build.sourceEncoding}</encoding>
- </configuration>
- </plugin>
- </plugins>
- </build>
-</project>
http://git-wip-us.apache.org/repos/asf/storm/blob/e8f64d5e/jstorm-utility/jstorm-kafka/src/main/java/com/alibaba/jstorm/kafka/Host.java
----------------------------------------------------------------------
diff --git a/jstorm-utility/jstorm-kafka/src/main/java/com/alibaba/jstorm/kafka/Host.java b/jstorm-utility/jstorm-kafka/src/main/java/com/alibaba/jstorm/kafka/Host.java
deleted file mode 100644
index 36227a0..0000000
--- a/jstorm-utility/jstorm-kafka/src/main/java/com/alibaba/jstorm/kafka/Host.java
+++ /dev/null
@@ -1,57 +0,0 @@
-package com.alibaba.jstorm.kafka;
-
-import java.io.Serializable;
-/**
- *
- * @author feilaoda
- *
- */
-public class Host implements Serializable {
- /**
- *
- */
- private static final long serialVersionUID = -315213440689707962L;
- private String host;
- private int port;
-
- public Host(String host) {
- this(host, 9092);
- }
-
- public Host(String host, int port) {
- this.host = host;
- this.port = port;
- }
-
- public String getHost() {
- return host;
- }
-
- public void setHost(String host) {
- this.host = host;
- }
-
- public int getPort() {
- return port;
- }
-
- public void setPort(int port) {
- this.port = port;
- }
-
- @Override
- public boolean equals(Object obj) {
- if (this == obj) {
- return true;
- }
- if (obj == null || getClass() != obj.getClass()) {
- return false;
- }
- if (obj instanceof Host) {
- final Host other = (Host) obj;
- return this.host.equals(other.host) && this.port == other.port;
- } else {
- return false;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e8f64d5e/jstorm-utility/jstorm-kafka/src/main/java/com/alibaba/jstorm/kafka/KafkaConsumer.java
----------------------------------------------------------------------
diff --git a/jstorm-utility/jstorm-kafka/src/main/java/com/alibaba/jstorm/kafka/KafkaConsumer.java b/jstorm-utility/jstorm-kafka/src/main/java/com/alibaba/jstorm/kafka/KafkaConsumer.java
deleted file mode 100644
index 787b285..0000000
--- a/jstorm-utility/jstorm-kafka/src/main/java/com/alibaba/jstorm/kafka/KafkaConsumer.java
+++ /dev/null
@@ -1,241 +0,0 @@
-package com.alibaba.jstorm.kafka;
-
-import java.io.IOException;
-import java.net.ConnectException;
-import java.net.SocketTimeoutException;
-import java.nio.channels.UnresolvedAddressException;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.log4j.Logger;
-
-import kafka.api.FetchRequest;
-import kafka.api.FetchRequestBuilder;
-import kafka.api.PartitionOffsetRequestInfo;
-import kafka.cluster.Broker;
-import kafka.common.ErrorMapping;
-import kafka.common.KafkaException;
-import kafka.common.TopicAndPartition;
-import kafka.javaapi.FetchResponse;
-import kafka.javaapi.OffsetRequest;
-import kafka.javaapi.PartitionMetadata;
-import kafka.javaapi.TopicMetadata;
-import kafka.javaapi.TopicMetadataRequest;
-import kafka.javaapi.consumer.SimpleConsumer;
-import kafka.javaapi.message.ByteBufferMessageSet;
-
-/**
- *
- * @author feilaoda
- *
- */
-public class KafkaConsumer {
-
- private static Logger LOG = Logger.getLogger(KafkaConsumer.class);
-
- public static final int NO_OFFSET = -1;
-
- private int status;
- private SimpleConsumer consumer = null;
-
- private KafkaSpoutConfig config;
- private LinkedList<Host> brokerList;
- private int brokerIndex;
- private Broker leaderBroker;
-
- public KafkaConsumer(KafkaSpoutConfig config) {
- this.config = config;
- this.brokerList = new LinkedList<Host>(config.brokers);
- this.brokerIndex = 0;
- }
-
- public ByteBufferMessageSet fetchMessages(int partition, long offset) throws IOException {
-
- String topic = config.topic;
- FetchRequest req = new FetchRequestBuilder().clientId(config.clientId).addFetch(topic, partition, offset, config.fetchMaxBytes)
- .maxWait(config.fetchWaitMaxMs).build();
- FetchResponse fetchResponse = null;
- SimpleConsumer simpleConsumer = null;
- try {
- simpleConsumer = findLeaderConsumer(partition);
- if (simpleConsumer == null) {
- // LOG.error(message);
- return null;
- }
- fetchResponse = simpleConsumer.fetch(req);
- } catch (Exception e) {
- if (e instanceof ConnectException || e instanceof SocketTimeoutException || e instanceof IOException
- || e instanceof UnresolvedAddressException) {
- LOG.warn("Network error when fetching messages:", e);
- if (simpleConsumer != null) {
- String host = simpleConsumer.host();
- int port = simpleConsumer.port();
- simpleConsumer = null;
- throw new KafkaException("Network error when fetching messages: " + host + ":" + port + " , " + e.getMessage(), e);
- }
-
- } else {
- throw new RuntimeException(e);
- }
- }
- if (fetchResponse.hasError()) {
- short code = fetchResponse.errorCode(topic, partition);
- if (code == ErrorMapping.OffsetOutOfRangeCode() && config.resetOffsetIfOutOfRange) {
- long startOffset = getOffset(topic, partition, config.startOffsetTime);
- offset = startOffset;
- }
- if(leaderBroker != null) {
- LOG.error("fetch data from kafka topic[" + config.topic + "] host[" + leaderBroker.host() + ":" + leaderBroker.port() + "] partition["
- + partition + "] error:" + code);
- }else {
-
- }
- return null;
- } else {
- ByteBufferMessageSet msgs = fetchResponse.messageSet(topic, partition);
- return msgs;
- }
- }
-
- private SimpleConsumer findLeaderConsumer(int partition) {
- try {
- if (consumer != null) {
- return consumer;
- }
- PartitionMetadata metadata = findLeader(partition);
- if (metadata == null) {
- leaderBroker = null;
- consumer = null;
- return null;
- }
- leaderBroker = metadata.leader();
- consumer = new SimpleConsumer(leaderBroker.host(), leaderBroker.port(), config.socketTimeoutMs, config.socketReceiveBufferBytes,
- config.clientId);
-
- return consumer;
- } catch (Exception e) {
- LOG.error(e.getMessage(), e);
- }
- return null;
- }
-
- protected PartitionMetadata findLeader(int partition) {
- PartitionMetadata returnMetaData = null;
- int errors = 0;
- int size = brokerList.size();
-
- Host brokerHost = brokerList.get(brokerIndex);
- try {
- if (consumer == null) {
- consumer = new SimpleConsumer(brokerHost.getHost(), brokerHost.getPort(), config.socketTimeoutMs, config.socketReceiveBufferBytes,
- config.clientId);
- }
- } catch (Exception e) {
- LOG.warn(e.getMessage(), e);
- consumer = null;
- }
- int i = brokerIndex;
- loop: while (i < size && errors < size + 1) {
- Host host = brokerList.get(i);
- i = (i + 1) % size;
- brokerIndex = i; // next index
- try {
-
- if (consumer == null) {
- consumer = new SimpleConsumer(host.getHost(), host.getPort(), config.socketTimeoutMs, config.socketReceiveBufferBytes,
- config.clientId);
- }
- List<String> topics = Collections.singletonList(config.topic);
- TopicMetadataRequest req = new TopicMetadataRequest(topics);
- kafka.javaapi.TopicMetadataResponse resp = null;
- try {
- resp = consumer.send(req);
- } catch (Exception e) {
- errors += 1;
-
- LOG.error("findLeader error, broker:" + host.toString() + ", will change to next broker index:" + (i + 1) % size);
- if (consumer != null) {
- consumer.close();
- consumer = null;
- }
- continue;
- }
-
- List<TopicMetadata> metaData = resp.topicsMetadata();
- for (TopicMetadata item : metaData) {
- for (PartitionMetadata part : item.partitionsMetadata()) {
- if (part.partitionId() == partition) {
- returnMetaData = part;
- break loop;
- }
- }
- }
-
- } catch (Exception e) {
- LOG.error("Error communicating with Broker:" + host.toString() + ", find Leader for partition:" + partition);
- } finally {
- if (consumer != null) {
- consumer.close();
- consumer = null;
- }
- }
- }
-
- return returnMetaData;
- }
-
- public long getOffset(String topic, int partition, long startOffsetTime) {
- SimpleConsumer simpleConsumer = findLeaderConsumer(partition);
-
- if (simpleConsumer == null) {
- LOG.error("Error consumer is null get offset from partition:" + partition);
- return -1;
- }
-
- TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partition);
- Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();
- requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(startOffsetTime, 1));
- OffsetRequest request = new OffsetRequest(requestInfo, kafka.api.OffsetRequest.CurrentVersion(), simpleConsumer.clientId());
-
- long[] offsets = simpleConsumer.getOffsetsBefore(request).offsets(topic, partition);
- if (offsets.length > 0) {
- return offsets[0];
- } else {
- return NO_OFFSET;
- }
- }
-
- public void close() {
- if (consumer != null) {
- consumer.close();
- }
- }
-
- public SimpleConsumer getConsumer() {
- return consumer;
- }
-
- public void setConsumer(SimpleConsumer consumer) {
- this.consumer = consumer;
- }
-
- public int getStatus() {
- return status;
- }
-
- public void setStatus(int status) {
- this.status = status;
- }
-
- public Broker getLeaderBroker() {
- return leaderBroker;
- }
-
- public void setLeaderBroker(Broker leaderBroker) {
- this.leaderBroker = leaderBroker;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e8f64d5e/jstorm-utility/jstorm-kafka/src/main/java/com/alibaba/jstorm/kafka/KafkaMessageId.java
----------------------------------------------------------------------
diff --git a/jstorm-utility/jstorm-kafka/src/main/java/com/alibaba/jstorm/kafka/KafkaMessageId.java b/jstorm-utility/jstorm-kafka/src/main/java/com/alibaba/jstorm/kafka/KafkaMessageId.java
deleted file mode 100644
index a7fe8ca..0000000
--- a/jstorm-utility/jstorm-kafka/src/main/java/com/alibaba/jstorm/kafka/KafkaMessageId.java
+++ /dev/null
@@ -1,27 +0,0 @@
-package com.alibaba.jstorm.kafka;
-
-public class KafkaMessageId {
- private int partition;
- private long offset;
-
- public KafkaMessageId(int partition, long offset) {
- this.setPartition(partition);
- this.setOffset(offset);
- }
-
- public int getPartition() {
- return partition;
- }
-
- public void setPartition(int partition) {
- this.partition = partition;
- }
-
- public long getOffset() {
- return offset;
- }
-
- public void setOffset(long offset) {
- this.offset = offset;
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e8f64d5e/jstorm-utility/jstorm-kafka/src/main/java/com/alibaba/jstorm/kafka/KafkaSpout.java
----------------------------------------------------------------------
diff --git a/jstorm-utility/jstorm-kafka/src/main/java/com/alibaba/jstorm/kafka/KafkaSpout.java b/jstorm-utility/jstorm-kafka/src/main/java/com/alibaba/jstorm/kafka/KafkaSpout.java
deleted file mode 100644
index 4fa11fa..0000000
--- a/jstorm-utility/jstorm-kafka/src/main/java/com/alibaba/jstorm/kafka/KafkaSpout.java
+++ /dev/null
@@ -1,124 +0,0 @@
-package com.alibaba.jstorm.kafka;
-
-import java.util.Collection;
-import java.util.Map;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.alibaba.jstorm.kafka.PartitionConsumer.EmitState;
-
-import backtype.storm.spout.SpoutOutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.IRichSpout;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.tuple.Fields;
-
-public class KafkaSpout implements IRichSpout {
- /**
- *
- */
- private static final long serialVersionUID = 1L;
- private static Logger LOG = LoggerFactory.getLogger(KafkaSpout.class);
-
- protected SpoutOutputCollector collector;
-
- private long lastUpdateMs;
- PartitionCoordinator coordinator;
-
- private KafkaSpoutConfig config;
-
- private ZkState zkState;
-
- public KafkaSpout() {
- config = new KafkaSpoutConfig();
- }
-
- public KafkaSpout(KafkaSpoutConfig config) {
- this.config = config;
- }
-
- @Override
- public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
- // TODO Auto-generated method stub
- this.collector = collector;
- config.configure(conf);
- zkState = new ZkState(conf, config);
- coordinator = new PartitionCoordinator(conf, config, context, zkState);
- lastUpdateMs = System.currentTimeMillis();
- }
-
- @Override
- public void close() {
- // TODO Auto-generated method stub
- zkState.close();
- }
-
- @Override
- public void activate() {
- // TODO Auto-generated method stub
-
- }
-
- @Override
- public void deactivate() {
- // TODO Auto-generated method stub
-
- }
-
- @Override
- public void nextTuple() {
- Collection<PartitionConsumer> partitionConsumers = coordinator.getPartitionConsumers();
- for(PartitionConsumer consumer: partitionConsumers) {
- EmitState state = consumer.emit(collector);
- LOG.debug("====== partition "+ consumer.getPartition() + " emit message state is "+state);
-// if(state != EmitState.EMIT_MORE) {
-// currentPartitionIndex = (currentPartitionIndex+1) % consumerSize;
-// }
-// if(state != EmitState.EMIT_NONE) {
-// break;
-// }
- }
- long now = System.currentTimeMillis();
- if((now - lastUpdateMs) > config.offsetUpdateIntervalMs) {
- commitState();
- }
-
-
- }
-
- public void commitState() {
- lastUpdateMs = System.currentTimeMillis();
- for(PartitionConsumer consumer: coordinator.getPartitionConsumers()) {
- consumer.commitState();
- }
-
- }
-
- @Override
- public void ack(Object msgId) {
- KafkaMessageId messageId = (KafkaMessageId)msgId;
- PartitionConsumer consumer = coordinator.getConsumer(messageId.getPartition());
- consumer.ack(messageId.getOffset());
- }
-
- @Override
- public void fail(Object msgId) {
- KafkaMessageId messageId = (KafkaMessageId)msgId;
- PartitionConsumer consumer = coordinator.getConsumer(messageId.getPartition());
- consumer.fail(messageId.getOffset());
- }
-
- @Override
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
- declarer.declare(new Fields("bytes"));
- }
-
- @Override
- public Map<String, Object> getComponentConfiguration() {
- return null;
- }
-
-
-
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e8f64d5e/jstorm-utility/jstorm-kafka/src/main/java/com/alibaba/jstorm/kafka/KafkaSpoutConfig.java
----------------------------------------------------------------------
diff --git a/jstorm-utility/jstorm-kafka/src/main/java/com/alibaba/jstorm/kafka/KafkaSpoutConfig.java b/jstorm-utility/jstorm-kafka/src/main/java/com/alibaba/jstorm/kafka/KafkaSpoutConfig.java
deleted file mode 100644
index 86b6e69..0000000
--- a/jstorm-utility/jstorm-kafka/src/main/java/com/alibaba/jstorm/kafka/KafkaSpoutConfig.java
+++ /dev/null
@@ -1,130 +0,0 @@
-package com.alibaba.jstorm.kafka;
-
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-
-
-import com.alibaba.jstorm.utils.JStormUtils;
-
-import backtype.storm.spout.MultiScheme;
-import backtype.storm.spout.RawMultiScheme;
-
-
-public class KafkaSpoutConfig implements Serializable {
-
-
- private static final long serialVersionUID = 1L;
-
- public List<Host> brokers;
- public int numPartitions;
- public String topic;
- public String zkRoot;
-
- public List<Host> zkServers;
-
- public int fetchMaxBytes = 256*1024;
- public int fetchWaitMaxMs = 10000;
- public int socketTimeoutMs = 30 * 1000;
- public int socketReceiveBufferBytes = 64*1024;
- public long startOffsetTime = -1;
- public boolean fromBeginning = false;
- public String clientId;
- public boolean resetOffsetIfOutOfRange = false;
- public long offsetUpdateIntervalMs=2000;
- private Properties properties = null;
- private Map stormConf;
- public int batchSendCount = 1;
-
- public KafkaSpoutConfig() {
- }
-
- public KafkaSpoutConfig(Properties properties) {
- this.properties = properties;
- }
-
- public void configure(Map conf) {
- this.stormConf = conf;
- topic = getConfig("kafka.topic", "jstorm");
- zkRoot = getConfig("storm.zookeeper.root", "/jstorm");
-
- String zkHosts = getConfig("kafka.zookeeper.hosts", "127.0.0.1:2181");
- zkServers = convertHosts(zkHosts, 2181);
- String brokerHosts = getConfig("kafka.broker.hosts", "127.0.0.1:9092");
- brokers = convertHosts(brokerHosts, 9092);
-
- numPartitions = JStormUtils.parseInt(getConfig("kafka.broker.partitions"), 1);
- fetchMaxBytes = JStormUtils.parseInt(getConfig("kafka.fetch.max.bytes"), 256*1024);
- fetchWaitMaxMs = JStormUtils.parseInt(getConfig("kafka.fetch.wait.max.ms"), 10000);
- socketTimeoutMs = JStormUtils.parseInt(getConfig("kafka.socket.timeout.ms"), 30 * 1000);
- socketReceiveBufferBytes = JStormUtils.parseInt(getConfig("kafka.socket.receive.buffer.bytes"), 64*1024);
- fromBeginning = JStormUtils.parseBoolean(getConfig("kafka.fetch.from.beginning"), false);
- startOffsetTime = JStormUtils.parseInt(getConfig("kafka.start.offset.time"), -1);
- offsetUpdateIntervalMs = JStormUtils.parseInt(getConfig("kafka.offset.update.interval.ms"), 2000);
- clientId = getConfig("kafka.client.id", "jstorm");
- batchSendCount = JStormUtils.parseInt(getConfig("kafka.spout.batch.send.count"), 1);
- }
-
-
- private String getConfig(String key) {
- return getConfig(key, null);
- }
-
- private String getConfig(String key, String defaultValue) {
- if(properties!=null && properties.containsKey(key)) {
- return properties.getProperty(key);
- }else if(stormConf.containsKey(key)) {
- return String.valueOf(stormConf.get(key));
- }else {
- return defaultValue;
- }
- }
-
-
- public List<Host> convertHosts(String hosts, int defaultPort) {
- List<Host> hostList = new ArrayList<Host>();
- String[] hostArr = hosts.split(",");
- for (String s : hostArr) {
- Host host;
- String[] spec = s.split(":");
- if (spec.length == 1) {
- host = new Host(spec[0],defaultPort);
- } else if (spec.length == 2) {
- host = new Host(spec[0], JStormUtils.parseInt(spec[1]));
- } else {
- throw new IllegalArgumentException("Invalid host specification: " + s);
- }
- hostList.add(host);
- }
- return hostList;
- }
-
-
- public List<Host> getHosts() {
- return brokers;
- }
-
- public void setHosts(List<Host> hosts) {
- this.brokers = hosts;
- }
-
- public int getPartitionsPerBroker() {
- return numPartitions;
- }
-
- public void setPartitionsPerBroker(int partitionsPerBroker) {
- this.numPartitions = partitionsPerBroker;
- }
-
- public String getTopic() {
- return topic;
- }
-
- public void setTopic(String topic) {
- this.topic = topic;
- }
-
-
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e8f64d5e/jstorm-utility/jstorm-kafka/src/main/java/com/alibaba/jstorm/kafka/PartitionConsumer.java
----------------------------------------------------------------------
diff --git a/jstorm-utility/jstorm-kafka/src/main/java/com/alibaba/jstorm/kafka/PartitionConsumer.java b/jstorm-utility/jstorm-kafka/src/main/java/com/alibaba/jstorm/kafka/PartitionConsumer.java
deleted file mode 100644
index 4b8ad7f..0000000
--- a/jstorm-utility/jstorm-kafka/src/main/java/com/alibaba/jstorm/kafka/PartitionConsumer.java
+++ /dev/null
@@ -1,227 +0,0 @@
-package com.alibaba.jstorm.kafka;
-
-
-import java.nio.ByteBuffer;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.SortedSet;
-import java.util.TreeSet;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.collect.ImmutableMap;
-
-import kafka.javaapi.message.ByteBufferMessageSet;
-import kafka.message.Message;
-import kafka.message.MessageAndOffset;
-import backtype.storm.Config;
-import backtype.storm.spout.SpoutOutputCollector;
-import backtype.storm.utils.Utils;
-
-/**
- *
- * @author feilaoda
- *
- */
-public class PartitionConsumer {
- private static Logger LOG = LoggerFactory.getLogger(PartitionConsumer.class);
-
- static enum EmitState {
- EMIT_MORE, EMIT_END, EMIT_NONE
- }
-
- private int partition;
- private KafkaConsumer consumer;
-
-
- private PartitionCoordinator coordinator;
-
- private KafkaSpoutConfig config;
- private LinkedList<MessageAndOffset> emittingMessages = new LinkedList<MessageAndOffset>();
- private SortedSet<Long> pendingOffsets = new TreeSet<Long>();
- private SortedSet<Long> failedOffsets = new TreeSet<Long>();
- private long emittingOffset;
- private long lastCommittedOffset;
- private ZkState zkState;
- private Map stormConf;
-
- public PartitionConsumer(Map conf, KafkaSpoutConfig config, int partition, ZkState offsetState) {
- this.stormConf = conf;
- this.config = config;
- this.partition = partition;
- this.consumer = new KafkaConsumer(config);
- this.zkState = offsetState;
-
- Long jsonOffset = null;
- try {
- Map<Object, Object> json = offsetState.readJSON(zkPath());
- if (json != null) {
- // jsonTopologyId = (String)((Map<Object,Object>)json.get("topology"));
- jsonOffset = (Long) json.get("offset");
- }
- } catch (Throwable e) {
- LOG.warn("Error reading and/or parsing at ZkNode: " + zkPath(), e);
- }
-
- try {
- if (config.fromBeginning) {
- emittingOffset = consumer.getOffset(config.topic, partition, kafka.api.OffsetRequest.EarliestTime());
- } else {
- if (jsonOffset == null) {
- lastCommittedOffset = consumer.getOffset(config.topic, partition, kafka.api.OffsetRequest.LatestTime());
- } else {
- lastCommittedOffset = jsonOffset;
- }
- emittingOffset = lastCommittedOffset;
- }
- } catch (Exception e) {
- LOG.error(e.getMessage(), e);
- }
- }
-
- public EmitState emit(SpoutOutputCollector collector) {
- if (emittingMessages.isEmpty()) {
- fillMessages();
- }
-
- int count = 0;
- while (true) {
- MessageAndOffset toEmitMsg = emittingMessages.pollFirst();
- if (toEmitMsg == null) {
- return EmitState.EMIT_END;
- }
- count ++;
- Iterable<List<Object>> tups = generateTuples(toEmitMsg.message());
-
- if (tups != null) {
- for (List<Object> tuple : tups) {
- LOG.debug("emit message {}", new String(Utils.toByteArray(toEmitMsg.message().payload())));
- collector.emit(tuple, new KafkaMessageId(partition, toEmitMsg.offset()));
- }
- if(count>=config.batchSendCount) {
- break;
- }
- } else {
- ack(toEmitMsg.offset());
- }
- }
-
- if (emittingMessages.isEmpty()) {
- return EmitState.EMIT_END;
- } else {
- return EmitState.EMIT_MORE;
- }
- }
-
- private void fillMessages() {
-
- ByteBufferMessageSet msgs;
- try {
- long start = System.currentTimeMillis();
- msgs = consumer.fetchMessages(partition, emittingOffset + 1);
-
- if (msgs == null) {
- LOG.error("fetch null message from offset {}", emittingOffset);
- return;
- }
-
- int count = 0;
- for (MessageAndOffset msg : msgs) {
- count += 1;
- emittingMessages.add(msg);
- emittingOffset = msg.offset();
- pendingOffsets.add(emittingOffset);
- LOG.debug("fillmessage fetched a message:{}, offset:{}", msg.message().toString(), msg.offset());
- }
- long end = System.currentTimeMillis();
- LOG.info("fetch message from partition:"+partition+", offset:" + emittingOffset+", size:"+msgs.sizeInBytes()+", count:"+count +", time:"+(end-start));
- } catch (Exception e) {
- e.printStackTrace();
- LOG.error(e.getMessage(),e);
- }
- }
-
- public void commitState() {
- try {
- long lastOffset = 0;
- if (pendingOffsets.isEmpty() || pendingOffsets.size() <= 0) {
- lastOffset = emittingOffset;
- } else {
- lastOffset = pendingOffsets.first();
- }
- if (lastOffset != lastCommittedOffset) {
- Map<Object, Object> data = new HashMap<Object, Object>();
- data.put("topology", stormConf.get(Config.TOPOLOGY_NAME));
- data.put("offset", lastOffset);
- data.put("partition", partition);
- data.put("broker", ImmutableMap.of("host", consumer.getLeaderBroker().host(), "port", consumer.getLeaderBroker().port()));
- data.put("topic", config.topic);
- zkState.writeJSON(zkPath(), data);
- lastCommittedOffset = lastOffset;
- }
- } catch (Exception e) {
- LOG.error(e.getMessage(), e);
- }
-
- }
-
- public void ack(long offset) {
- try {
- pendingOffsets.remove(offset);
- } catch (Exception e) {
- LOG.error("offset ack error " + offset);
- }
- }
-
- public void fail(long offset) {
- failedOffsets.remove(offset);
- }
-
- public void close() {
- coordinator.removeConsumer(partition);
- consumer.close();
- }
-
- @SuppressWarnings("unchecked")
- public Iterable<List<Object>> generateTuples(Message msg) {
- Iterable<List<Object>> tups = null;
- ByteBuffer payload = msg.payload();
- if (payload == null) {
- return null;
- }
- tups = Arrays.asList(Utils.tuple(Utils.toByteArray(payload)));
- return tups;
- }
-
- private String zkPath() {
- return config.zkRoot + "/kafka/offset/topic/" + config.topic + "/" + config.clientId + "/" + partition;
- }
-
- public PartitionCoordinator getCoordinator() {
- return coordinator;
- }
-
- public void setCoordinator(PartitionCoordinator coordinator) {
- this.coordinator = coordinator;
- }
-
- public int getPartition() {
- return partition;
- }
-
- public void setPartition(int partition) {
- this.partition = partition;
- }
-
- public KafkaConsumer getConsumer() {
- return consumer;
- }
-
- public void setConsumer(KafkaConsumer consumer) {
- this.consumer = consumer;
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e8f64d5e/jstorm-utility/jstorm-kafka/src/main/java/com/alibaba/jstorm/kafka/PartitionCoordinator.java
----------------------------------------------------------------------
diff --git a/jstorm-utility/jstorm-kafka/src/main/java/com/alibaba/jstorm/kafka/PartitionCoordinator.java b/jstorm-utility/jstorm-kafka/src/main/java/com/alibaba/jstorm/kafka/PartitionCoordinator.java
deleted file mode 100644
index 25dc368..0000000
--- a/jstorm-utility/jstorm-kafka/src/main/java/com/alibaba/jstorm/kafka/PartitionCoordinator.java
+++ /dev/null
@@ -1,49 +0,0 @@
-package com.alibaba.jstorm.kafka;
-
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-
-import backtype.storm.task.TopologyContext;
-
-public class PartitionCoordinator {
- private KafkaSpoutConfig config;
- private Map<Integer, PartitionConsumer> partitionConsumerMap;
- private List<PartitionConsumer> partitionConsumers;
-
- private ZkState zkState;
- public PartitionCoordinator(Map conf, KafkaSpoutConfig config, TopologyContext context, ZkState zkState) {
- this.config = config;
- this.zkState = zkState;
- partitionConsumers = new LinkedList<PartitionConsumer>();
- createPartitionConsumers(conf, context);
- }
-
- private void createPartitionConsumers(Map conf, TopologyContext context) {
- partitionConsumerMap = new HashMap<Integer, PartitionConsumer>();
- int taskSize = context.getComponentTasks(context.getThisComponentId()).size();
- for(int i=context.getThisTaskIndex(); i<config.numPartitions; i+=taskSize) {
- PartitionConsumer partitionConsumer = new PartitionConsumer(conf, config, i, zkState);
- partitionConsumers.add(partitionConsumer);
- partitionConsumerMap.put(i, partitionConsumer);
- }
- }
-
- public List<PartitionConsumer> getPartitionConsumers() {
- return partitionConsumers;
- }
-
- public PartitionConsumer getConsumer(int partition) {
- return partitionConsumerMap.get(partition);
- }
-
- public void removeConsumer(int partition) {
- PartitionConsumer partitionConsumer = partitionConsumerMap.get(partition);
- partitionConsumers.remove(partitionConsumer);
- partitionConsumerMap.remove(partition);
- }
-
-
-
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e8f64d5e/jstorm-utility/jstorm-kafka/src/main/java/com/alibaba/jstorm/kafka/ZkState.java
----------------------------------------------------------------------
diff --git a/jstorm-utility/jstorm-kafka/src/main/java/com/alibaba/jstorm/kafka/ZkState.java b/jstorm-utility/jstorm-kafka/src/main/java/com/alibaba/jstorm/kafka/ZkState.java
deleted file mode 100644
index ac512e5..0000000
--- a/jstorm-utility/jstorm-kafka/src/main/java/com/alibaba/jstorm/kafka/ZkState.java
+++ /dev/null
@@ -1,95 +0,0 @@
-package com.alibaba.jstorm.kafka;
-
-import backtype.storm.Config;
-import backtype.storm.utils.Utils;
-
-import java.nio.charset.Charset;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.CuratorFrameworkFactory;
-import org.apache.curator.framework.api.CreateBuilder;
-import org.apache.curator.framework.api.ProtectACLCreateModePathAndBytesable;
-import org.apache.curator.retry.RetryNTimes;
-import org.apache.zookeeper.CreateMode;
-import org.json.simple.JSONValue;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class ZkState {
- public static final Logger LOG = LoggerFactory.getLogger(ZkState.class);
- CuratorFramework _curator;
-
- private CuratorFramework newCurator(Map conf, KafkaSpoutConfig config) throws Exception {
- String serverPorts = "";
- List<Host> zkServers = config.zkServers;
- for (Host server : zkServers) {
- serverPorts = serverPorts + server.getHost() + ":" + server.getPort() + ",";
- }
- return CuratorFrameworkFactory.newClient(serverPorts, Utils.getInt(conf.get(Config.STORM_ZOOKEEPER_SESSION_TIMEOUT)), 15000, new RetryNTimes(
- Utils.getInt(conf.get(Config.STORM_ZOOKEEPER_RETRY_TIMES)), Utils.getInt(conf.get(Config.STORM_ZOOKEEPER_RETRY_INTERVAL))));
- }
-
- public CuratorFramework getCurator() {
- assert _curator != null;
- return _curator;
- }
-
- public ZkState(Map stateConf, KafkaSpoutConfig config) {
- try {
- _curator = newCurator(stateConf, config);
- _curator.start();
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
-
- public void writeJSON(String path, Map<Object, Object> data) {
- LOG.info("Writing " + path + " the data " + data.toString());
- writeBytes(path, JSONValue.toJSONString(data).getBytes(Charset.forName("UTF-8")));
- }
-
- public void writeBytes(String path, byte[] bytes) {
- try {
- if (_curator.checkExists().forPath(path) == null) {
- CreateBuilder builder = _curator.create();
- ProtectACLCreateModePathAndBytesable<String> createAble = (ProtectACLCreateModePathAndBytesable<String>) builder
- .creatingParentsIfNeeded();
- createAble.withMode(CreateMode.PERSISTENT).forPath(path, bytes);
- } else {
- _curator.setData().forPath(path, bytes);
- }
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
-
- public Map<Object, Object> readJSON(String path) {
- try {
- byte[] b = readBytes(path);
- if (b == null)
- return null;
- return (Map<Object, Object>) JSONValue.parse(new String(b, "UTF-8"));
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
-
- public byte[] readBytes(String path) {
- try {
- if (_curator.checkExists().forPath(path) != null) {
- return _curator.getData().forPath(path);
- } else {
- return null;
- }
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
-
- public void close() {
- _curator.close();
- _curator = null;
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/storm/blob/e8f64d5e/jstorm-utility/jstorm-kafka/src/test/java/com/alibaba/jstorm/test/kafka/KafkaTest.java
----------------------------------------------------------------------
diff --git a/jstorm-utility/jstorm-kafka/src/test/java/com/alibaba/jstorm/test/kafka/KafkaTest.java b/jstorm-utility/jstorm-kafka/src/test/java/com/alibaba/jstorm/test/kafka/KafkaTest.java
deleted file mode 100644
index a63adec..0000000
--- a/jstorm-utility/jstorm-kafka/src/test/java/com/alibaba/jstorm/test/kafka/KafkaTest.java
+++ /dev/null
@@ -1,57 +0,0 @@
-package com.alibaba.jstorm.test.kafka;
-
-import java.util.Properties;
-
-import kafka.server.KafkaServerStartable;
-
-import com.netflix.curator.framework.CuratorFramework;
-import com.netflix.curator.framework.CuratorFrameworkFactory;
-import com.netflix.curator.retry.ExponentialBackoffRetry;
-import com.netflix.curator.test.TestingServer;
-
-public class KafkaTest {
- private final int port = 49123;
- private KafkaServerStartable kafka;
- private TestingServer server;
- private String zookeeperConnectionString;
-
- public KafkaTest() {}
-
- public void run() {
- try {
- server = new TestingServer();
- zookeeperConnectionString = server.getConnectString();
- ExponentialBackoffRetry retryPolicy = new ExponentialBackoffRetry(
- 1000, 3);
- CuratorFramework zookeeper = CuratorFrameworkFactory.newClient(
- zookeeperConnectionString, retryPolicy);
- zookeeper.start();
- Properties p = new Properties();
- p.setProperty("zookeeper.connect", zookeeperConnectionString);
- p.setProperty("broker.id", "0");
- p.setProperty("port", "" + port);
- kafka.server.KafkaConfig config = new kafka.server.KafkaConfig(p);
- kafka = new KafkaServerStartable(config);
- kafka.startup();
- } catch (Exception ex) {
- throw new RuntimeException("Could not start test broker", ex);
- }
- }
-
- public String getBrokerConnectionString() {
- return "localhost:" + port;
- }
-
- public int getPort() {
- return port;
- }
-
- public void shutdown() {
- kafka.shutdown();
- }
-
- public static void main(String[] args) {
- KafkaTest test = new KafkaTest();
- test.run();
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e8f64d5e/jstorm-utility/jstorm-rocket-mq/pom.xml
----------------------------------------------------------------------
diff --git a/jstorm-utility/jstorm-rocket-mq/pom.xml b/jstorm-utility/jstorm-rocket-mq/pom.xml
deleted file mode 100644
index ffc611e..0000000
--- a/jstorm-utility/jstorm-rocket-mq/pom.xml
+++ /dev/null
@@ -1,94 +0,0 @@
-<?xml version="1.0"?>
-<project xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd" xmlns="http://maven.apache.org/POM/4.0.0"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
- <modelVersion>4.0.0</modelVersion>
- <groupId>com.alibaba.jstorm</groupId>
- <artifactId>metaspout</artifactId>
- <version>0.2.0-SNAPSHOT</version>
-
- <properties>
- <jstorm.version>0.9.6.1</jstorm.version>
- </properties>
-
- <build>
- <plugins>
- <plugin>
- <artifactId>maven-assembly-plugin</artifactId>
- <configuration>
- <descriptorRefs>
- <descriptorRef>jar-with-dependencies</descriptorRef>
- </descriptorRefs>
- </configuration>
- <executions>
- <execution>
- <id>make-assembly</id>
- <phase>package</phase>
- <goals>
- <goal>single</goal>
- </goals>
- </execution>
- </executions>
- </plugin>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-compiler-plugin</artifactId>
- <configuration>
- <source>1.6</source>
- <target>1.6</target>
- </configuration>
- </plugin>
- </plugins>
- </build>
-
- <dependencies>
- <dependency>
- <groupId>com.alibaba.jstorm</groupId>
- <artifactId>jstorm-client</artifactId>
- <version>${jstorm.version}</version>
- <scope>provided</scope>
- </dependency>
- <dependency>
- <groupId>com.alibaba.jstorm</groupId>
- <artifactId>jstorm-client-extension</artifactId>
- <version>${jstorm.version}</version>
- <scope>provided</scope>
- </dependency>
- <dependency>
- <groupId>com.alibaba.jstorm</groupId>
- <artifactId>jstorm-server</artifactId>
- <version>${jstorm.version}</version>
- <scope>provided</scope>
- </dependency>
-
- <dependency>
- <groupId>junit</groupId>
- <artifactId>junit</artifactId>
- <version>4.4</version>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>com.taobao.metaq.final</groupId>
- <artifactId>metaq-client</artifactId>
- <version>3.1.8</version>
- </dependency>
- <!--
- <dependency>
- <groupId>com.alibaba.rocketmq</groupId>
- <artifactId>rocketmq-common</artifactId>
- <version>3.0.1</version>
- </dependency>
- <dependency>
- <groupId>com.alibaba.rocketmq</groupId>
- <artifactId>rocketmq-client</artifactId>
- <version>3.0.1</version>
- </dependency>
- <dependency>
- <groupId>com.alibaba.rocketmq</groupId>
- <artifactId>rocketmq-remoting</artifactId>
- <version>3.0.1</version>
- </dependency>
- -->
-
- </dependencies>
-
-</project>
http://git-wip-us.apache.org/repos/asf/storm/blob/e8f64d5e/jstorm-utility/jstorm-rocket-mq/src/main/java/com/alibaba/aloha/meta/MetaClientConfig.java
----------------------------------------------------------------------
diff --git a/jstorm-utility/jstorm-rocket-mq/src/main/java/com/alibaba/aloha/meta/MetaClientConfig.java b/jstorm-utility/jstorm-rocket-mq/src/main/java/com/alibaba/aloha/meta/MetaClientConfig.java
deleted file mode 100644
index f8a9c9c..0000000
--- a/jstorm-utility/jstorm-rocket-mq/src/main/java/com/alibaba/aloha/meta/MetaClientConfig.java
+++ /dev/null
@@ -1,263 +0,0 @@
-package com.alibaba.aloha.meta;
-
-import java.io.Serializable;
-import java.util.Date;
-import java.util.Map;
-import java.util.Properties;
-
-import org.apache.commons.lang.StringUtils;
-import org.apache.commons.lang.builder.ToStringBuilder;
-import org.apache.commons.lang.builder.ToStringStyle;
-
-import com.alibaba.jstorm.utils.JStormUtils;
-import com.alibaba.jstorm.utils.TimeFormat;
-
-/**
- * Meta Spout Setting
- *
- * All needed configs must prepare before submit topopoly
- *
- * @author zhongyan.feng/zhiyuan.ls
- */
-public class MetaClientConfig implements Serializable {
-
- private static final long serialVersionUID = 4157424979688593280L;
-
- public static final String META_TOPIC = "meta.topic";
- public static final String META_CONSUMER_GROUP = "meta.consumer.group";
- public static final String META_SUBEXPRESS = "meta.subexpress";
- public static final String META_NAMESERVER = "meta.nameserver";
- //pull interval(ms) from meta server
- public static final String META_PULL_INTERVAL = "meta.pull.interval.ms";
- // max fail times
- public static final String META_MAX_FAIL_TIMES = "meta.max.fail.times";
- // meta client internal queue size
- public static final String META_INTERNAL_QUEUE_SIZE = "meta.internal.queue.size";
- // spout send one batch size
- public static final String META_BATCH_SEND_MSG_SIZE = "meta.batch.send.msg.size";
- // meta client pull batch size from meta server
- public static final String META_BATCH_PULL_MSG_SIZE = "meta.batch.pull.msg.size";
- // meta client pull thread num
- public static final String META_PULL_THREAD_NUM = "meta.pull.thread.num";
- // meta message automatically ack
- public static final String META_SPOUT_AUTO_ACK = "meta.spout.auto.ack";
- // enable meta spout flow control
- public static final String META_SPOUT_FLOW_CONTROL= "meta.spout.flow.control";
-
- // format is "yyyyMMddHHmmss"
- // set the meta client offset from this timestamp
- public static final String META_CONSUMER_START_TIMESTAMP = "meta.consumer.start.timestamp";
- public static final String META_EXTRA_PROPERTIES = "meta.extra.properties";
-
-
- private final String consumerGroup;
-
- /**
- * Alipay need set nameServer, taobao don't need set this field
- */
- private final String nameServer;
-
- private final String topic;
-
- private final String subExpress;
-
- /**
- * The max allowed failures for one single message, skip the failure message
- * if excesses
- *
- * -1 means try again until success
- */
- private int maxFailTimes = DEFAULT_FAIL_TIME;
- public static final int DEFAULT_FAIL_TIME = 5;
-
- /**
- * Local messages threshold, trigger flow control if excesses
- *
- */
- private int queueSize = DEFAULT_QUEUE_SIZE;
- public static final int DEFAULT_QUEUE_SIZE = 256;
-
- /**
- * fetch messages size from local queue
- * it is also sending batch size
- *
- */
- private int sendBatchSize = DEFAULT_BATCH_MSG_NUM;
- public static final int DEFAULT_BATCH_MSG_NUM = 32;
-
- /**
- * pull message size from meta server
- *
- */
- private int pullBatchSize = DEFAULT_BATCH_MSG_NUM;
-
- /**
- * pull interval(ms) from server for every batch
- *
- */
- private long pullInterval = 0;
-
- /**
- * pull threads num
- */
- private int pullThreadNum = DEFAULT_PULL_THREAD_NUM;
- public static int DEFAULT_PULL_THREAD_NUM = 4;
-
- /**
- * Consumer start time Null means start from the last consumption
- * time(CONSUME_FROM_LAST_OFFSET)
- *
- */
- private Date startTimeStamp;
-
- private Properties peroperties;
-
- protected MetaClientConfig(String consumerGroup, String nameServer,
- String topic, String subExpress) {
- this.consumerGroup = consumerGroup;
- this.nameServer = nameServer;
- this.topic = topic;
- this.subExpress = subExpress;
- }
-
- public MetaClientConfig(Map conf) {
- topic = (String) conf.get(META_TOPIC);
- consumerGroup = (String) conf.get(META_CONSUMER_GROUP);
- subExpress = (String) conf.get(META_SUBEXPRESS);
- if (StringUtils.isBlank((String) conf.get(META_NAMESERVER)) == false) {
- nameServer = (String) conf.get(META_NAMESERVER);
- }else {
- nameServer = null;
- }
-
- maxFailTimes = JStormUtils.parseInt(conf.get(META_MAX_FAIL_TIMES),
- DEFAULT_FAIL_TIME);
-
- queueSize = JStormUtils.parseInt(conf.get(META_INTERNAL_QUEUE_SIZE),
- DEFAULT_QUEUE_SIZE);
-
- sendBatchSize = JStormUtils.parseInt(conf.get(META_BATCH_SEND_MSG_SIZE),
- DEFAULT_BATCH_MSG_NUM);
-
- pullBatchSize = JStormUtils.parseInt(conf.get(META_BATCH_PULL_MSG_SIZE),
- DEFAULT_BATCH_MSG_NUM);
-
- pullInterval = JStormUtils.parseInt(conf.get(META_PULL_INTERVAL), 0);
-
- pullThreadNum = JStormUtils.parseInt(conf.get(META_PULL_THREAD_NUM),
- DEFAULT_PULL_THREAD_NUM);
-
- String ts = (String)conf.get(META_CONSUMER_START_TIMESTAMP);
- if (ts != null) {
- Date date = null;
- try {
- date = TimeFormat.getSecond(ts);
- }catch(Exception e) {
-
- }
-
- if (date != null) {
- startTimeStamp = date;
- }
- }
-
- Object prop = conf.get(META_EXTRA_PROPERTIES);
- if (prop != null && prop instanceof Properties) {
- peroperties = (Properties)prop;
- }
- }
-
- public static MetaClientConfig mkInstance(Map conf) {
-
- return new MetaClientConfig(conf);
- }
-
-
-
- public int getMaxFailTimes() {
- return maxFailTimes;
- }
-
- public void setMaxFailTimes(int maxFailTimes) {
- this.maxFailTimes = maxFailTimes;
- }
-
- public int getQueueSize() {
- return queueSize;
- }
-
- public void setQueueSize(int queueSize) {
- this.queueSize = queueSize;
- }
-
- public int getSendBatchSize() {
- return sendBatchSize;
- }
-
- public void setSendBatchSize(int sendBatchSize) {
- this.sendBatchSize = sendBatchSize;
- }
-
- public int getPullBatchSize() {
- return pullBatchSize;
- }
-
- public void setPullBatchSize(int pullBatchSize) {
- this.pullBatchSize = pullBatchSize;
- }
-
- public long getPullInterval() {
- return pullInterval;
- }
-
- public void setPullInterval(long pullInterval) {
- this.pullInterval = pullInterval;
- }
-
- public int getPullThreadNum() {
- return pullThreadNum;
- }
-
- public void setPullThreadNum(int pullThreadNum) {
- this.pullThreadNum = pullThreadNum;
- }
-
- public Date getStartTimeStamp() {
- return startTimeStamp;
- }
-
- public void setStartTimeStamp(Date startTimeStamp) {
- this.startTimeStamp = startTimeStamp;
- }
-
- public Properties getPeroperties() {
- return peroperties;
- }
-
- public void setPeroperties(Properties peroperties) {
- this.peroperties = peroperties;
- }
-
- public String getConsumerGroup() {
- return consumerGroup;
- }
-
- public String getNameServer() {
- return nameServer;
- }
-
- public String getTopic() {
- return topic;
- }
-
- public String getSubExpress() {
- return subExpress;
- }
-
- @Override
- public String toString() {
- return ToStringBuilder.reflectionToString(this,
- ToStringStyle.SHORT_PREFIX_STYLE);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e8f64d5e/jstorm-utility/jstorm-rocket-mq/src/main/java/com/alibaba/aloha/meta/MetaConsumerFactory.java
----------------------------------------------------------------------
diff --git a/jstorm-utility/jstorm-rocket-mq/src/main/java/com/alibaba/aloha/meta/MetaConsumerFactory.java b/jstorm-utility/jstorm-rocket-mq/src/main/java/com/alibaba/aloha/meta/MetaConsumerFactory.java
deleted file mode 100644
index 52db507..0000000
--- a/jstorm-utility/jstorm-rocket-mq/src/main/java/com/alibaba/aloha/meta/MetaConsumerFactory.java
+++ /dev/null
@@ -1,109 +0,0 @@
-package com.alibaba.aloha.meta;
-
-import java.util.Date;
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.log4j.Logger;
-
-import com.alibaba.jstorm.utils.JStormUtils;
-import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
-import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;
-import com.alibaba.rocketmq.common.protocol.heartbeat.MessageModel;
-import com.taobao.metaq.client.MetaHelper;
-import com.taobao.metaq.client.MetaPushConsumer;
-
-public class MetaConsumerFactory {
-
- private static final Logger LOG = Logger.getLogger(MetaConsumerFactory.class);
-
-
- private static final long serialVersionUID = 4641537253577312163L;
-
- public static Map<String, MetaPushConsumer> consumers =
- new HashMap<String, MetaPushConsumer>();
-
- public static synchronized MetaPushConsumer mkInstance(MetaClientConfig config,
- MessageListenerConcurrently listener) throws Exception{
-
- String topic = config.getTopic();
- String groupId = config.getConsumerGroup();
-
- String key = topic + "@" + groupId;
-
- MetaPushConsumer consumer = consumers.get(key);
- if (consumer != null) {
-
- LOG.info("Consumer of " + key + " has been created, don't recreate it ");
-
- //Attention, this place return null to info duplicated consumer
- return null;
- }
-
-
- StringBuilder sb = new StringBuilder();
- sb.append("Begin to init meta client \n");
- sb.append(",configuration:").append(config);
-
- LOG.info(sb.toString());
-
- consumer = new MetaPushConsumer(config.getConsumerGroup());
-
- String nameServer = config.getNameServer();
- if ( nameServer != null) {
- String namekey = "rocketmq.namesrv.domain";
-
- String value = System.getProperty(namekey);
- // this is for alipay
- if (value == null) {
-
- System.setProperty(namekey, nameServer);
- } else if (value.equals(nameServer) == false) {
- throw new Exception(
- "Different nameserver address in the same worker "
- + value + ":" + nameServer);
-
- }
- }
-
- String instanceName = groupId +"@" + JStormUtils.process_pid();
- consumer.setInstanceName(instanceName);
- consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
- consumer.subscribe(config.getTopic(), config.getSubExpress());
- consumer.registerMessageListener(listener);
-
- consumer.setPullThresholdForQueue(config.getQueueSize());
- consumer.setConsumeMessageBatchMaxSize(config.getSendBatchSize());
- consumer.setPullBatchSize(config.getPullBatchSize());
- consumer.setPullInterval(config.getPullInterval());
- consumer.setConsumeThreadMin(config.getPullThreadNum());
- consumer.setConsumeThreadMax(config.getPullThreadNum());
-
-
- Date date = config.getStartTimeStamp() ;
- if ( date != null) {
- LOG.info("Begin to reset meta offset to " + date);
- try {
- MetaHelper.resetOffsetByTimestamp(MessageModel.CLUSTERING,
- instanceName, config.getConsumerGroup(),
- config.getTopic(), date.getTime());
- LOG.info("Successfully reset meta offset to " + date);
- }catch(Exception e) {
- LOG.error("Failed to reset meta offset to " + date);
- }
-
- }else {
- LOG.info("Don't reset meta offset ");
- }
-
- consumer.start();
-
- consumers.put(key, consumer);
- LOG.info("Successfully create " + key + " consumer");
-
-
- return consumer;
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e8f64d5e/jstorm-utility/jstorm-rocket-mq/src/main/java/com/alibaba/aloha/meta/MetaSpout.java
----------------------------------------------------------------------
diff --git a/jstorm-utility/jstorm-rocket-mq/src/main/java/com/alibaba/aloha/meta/MetaSpout.java b/jstorm-utility/jstorm-rocket-mq/src/main/java/com/alibaba/aloha/meta/MetaSpout.java
deleted file mode 100644
index e6c3a26..0000000
--- a/jstorm-utility/jstorm-rocket-mq/src/main/java/com/alibaba/aloha/meta/MetaSpout.java
+++ /dev/null
@@ -1,248 +0,0 @@
-package com.alibaba.aloha.meta;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.LinkedBlockingDeque;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.apache.log4j.Logger;
-
-import backtype.storm.spout.SpoutOutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.IRichSpout;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.tuple.Fields;
-import backtype.storm.tuple.Values;
-
-import com.alibaba.jstorm.client.metric.MetricClient;
-import com.alibaba.jstorm.client.spout.IAckValueSpout;
-import com.alibaba.jstorm.client.spout.IFailValueSpout;
-import com.alibaba.jstorm.metric.JStormHistogram;
-import com.alibaba.jstorm.utils.JStormUtils;
-import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
-import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
-import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
-import com.alibaba.rocketmq.common.message.MessageExt;
-import com.taobao.metaq.client.MetaPushConsumer;
-
-public class MetaSpout implements IRichSpout, IAckValueSpout, IFailValueSpout,
- MessageListenerConcurrently {
- /** */
- private static final long serialVersionUID = 8476906628618859716L;
- private static final Logger LOG = Logger.getLogger(MetaSpout.class);
-
- protected MetaClientConfig metaClientConfig;
- protected SpoutOutputCollector collector;
- protected transient MetaPushConsumer consumer;
-
- protected Map conf;
- protected String id;
- protected boolean flowControl;
- protected boolean autoAck;
-
- protected transient LinkedBlockingDeque<MetaTuple> sendingQueue;
-
- protected transient MetricClient metricClient;
- protected transient JStormHistogram waithHistogram;
- protected transient JStormHistogram processHistogram;
-
- public MetaSpout() {
-
- }
-
- public void initMetricClient(TopologyContext context) {
- metricClient = new MetricClient(context);
- waithHistogram = metricClient.registerHistogram("MetaTupleWait", null);
- processHistogram = metricClient.registerHistogram("MetaTupleProcess",
- null);
- }
-
- @Override
- public void open(Map conf, TopologyContext context,
- SpoutOutputCollector collector) {
- this.conf = conf;
- this.collector = collector;
- this.id = context.getThisComponentId() + ":" + context.getThisTaskId();
- this.sendingQueue = new LinkedBlockingDeque<MetaTuple>();
-
- this.flowControl = JStormUtils.parseBoolean(
- conf.get(MetaClientConfig.META_SPOUT_FLOW_CONTROL), true);
- this.autoAck = JStormUtils.parseBoolean(
- conf.get(MetaClientConfig.META_SPOUT_AUTO_ACK), false);
-
- StringBuilder sb = new StringBuilder();
- sb.append("Begin to init MetaSpout:").append(id);
- sb.append(", flowControl:").append(flowControl);
- sb.append(", autoAck:").append(autoAck);
- LOG.info( sb.toString());
-
- initMetricClient(context);
-
- metaClientConfig = MetaClientConfig.mkInstance(conf);
-
- try {
- consumer = MetaConsumerFactory.mkInstance(metaClientConfig, this);
- } catch (Exception e) {
- LOG.error("Failed to create Meta Consumer ", e);
- throw new RuntimeException("Failed to create MetaConsumer" + id, e);
- }
-
- if (consumer == null) {
- LOG.warn(id
- + " already exist consumer in current worker, don't need to fetch data ");
-
- new Thread(new Runnable() {
-
- @Override
- public void run() {
- while (true) {
- try {
- Thread.sleep(10000);
- } catch (InterruptedException e) {
- break;
- }
-
- StringBuilder sb = new StringBuilder();
- sb.append("Only on meta consumer can be run on one process,");
- sb.append(" but there are mutliple spout consumes with the same topic@groupid meta, so the second one ");
- sb.append(id).append(" do nothing ");
- LOG.info(sb.toString());
- }
- }
- }).start();
- }
-
- LOG.info("Successfully init " + id);
- }
-
- @Override
- public void close() {
- if (consumer != null) {
- consumer.shutdown();
- }
-
- }
-
- @Override
- public void activate() {
- if (consumer != null) {
- consumer.resume();
- }
-
- }
-
- @Override
- public void deactivate() {
- if (consumer != null) {
- consumer.suspend();
- }
- }
-
- public void sendTuple(MetaTuple metaTuple) {
- metaTuple.updateEmitMs();
- collector.emit(new Values(metaTuple), metaTuple.getCreateMs());
- }
-
- @Override
- public void nextTuple() {
- MetaTuple metaTuple = null;
- try {
- metaTuple = sendingQueue.take();
- } catch (InterruptedException e) {
- }
-
- if (metaTuple == null) {
- return;
- }
-
- sendTuple(metaTuple);
-
- }
-
- @Deprecated
- public void ack(Object msgId) {
- LOG.warn("Shouldn't go this function");
- }
-
- @Deprecated
- public void fail(Object msgId) {
- LOG.warn("Shouldn't go this function");
- }
-
- @Override
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
- declarer.declare(new Fields("MetaTuple"));
- }
-
- @Override
- public Map<String, Object> getComponentConfiguration() {
- return null;
- }
-
- @Override
- public void fail(Object msgId, List<Object> values) {
- MetaTuple metaTuple = (MetaTuple) values.get(0);
- AtomicInteger failTimes = metaTuple.getFailureTimes();
-
- int failNum = failTimes.incrementAndGet();
- if (failNum > metaClientConfig.getMaxFailTimes()) {
- LOG.warn("Message " + metaTuple.getMq() + " fail times " + failNum);
- finishTuple(metaTuple);
- return;
- }
-
- if (flowControl) {
- sendingQueue.offer(metaTuple);
- } else {
- sendTuple(metaTuple);
- }
- }
-
- public void finishTuple(MetaTuple metaTuple) {
- waithHistogram.update(metaTuple.getEmitMs() - metaTuple.getCreateMs());
- processHistogram.update(System.currentTimeMillis() - metaTuple.getEmitMs());
- metaTuple.done();
- }
-
- @Override
- public void ack(Object msgId, List<Object> values) {
- MetaTuple metaTuple = (MetaTuple) values.get(0);
- finishTuple(metaTuple);
- }
-
- @Override
- public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
- ConsumeConcurrentlyContext context) {
- try {
- MetaTuple metaTuple = new MetaTuple(msgs, context.getMessageQueue());
-
- if (flowControl) {
- sendingQueue.offer(metaTuple);
- } else {
- sendTuple(metaTuple);
- }
-
- if (autoAck) {
- return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
- } else {
- metaTuple.waitFinish();
- if (metaTuple.isSuccess() == true) {
- return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
- } else {
- return ConsumeConcurrentlyStatus.RECONSUME_LATER;
- }
- }
-
- } catch (Exception e) {
- LOG.error("Failed to emit " + id, e);
- return ConsumeConcurrentlyStatus.RECONSUME_LATER;
- }
-
- }
-
- public MetaPushConsumer getConsumer() {
- return consumer;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e8f64d5e/jstorm-utility/jstorm-rocket-mq/src/main/java/com/alibaba/aloha/meta/MetaTuple.java
----------------------------------------------------------------------
diff --git a/jstorm-utility/jstorm-rocket-mq/src/main/java/com/alibaba/aloha/meta/MetaTuple.java b/jstorm-utility/jstorm-rocket-mq/src/main/java/com/alibaba/aloha/meta/MetaTuple.java
deleted file mode 100644
index d735749..0000000
--- a/jstorm-utility/jstorm-rocket-mq/src/main/java/com/alibaba/aloha/meta/MetaTuple.java
+++ /dev/null
@@ -1,90 +0,0 @@
-package com.alibaba.aloha.meta;
-
-import java.io.Serializable;
-import java.util.List;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.apache.commons.lang.builder.ToStringBuilder;
-import org.apache.commons.lang.builder.ToStringStyle;
-
-import com.alibaba.rocketmq.common.message.MessageExt;
-import com.alibaba.rocketmq.common.message.MessageQueue;
-
-public class MetaTuple implements Serializable {
-
- /** */
- private static final long serialVersionUID = 2277714452693486955L;
-
- protected final List<MessageExt> msgList;
- protected final MessageQueue mq;
-
- protected final AtomicInteger failureTimes;
- protected final long createMs;
- protected long emitMs;
-
- protected transient CountDownLatch latch;
- protected transient boolean isSuccess;
-
- public MetaTuple(List<MessageExt> msgList, MessageQueue mq) {
- this.msgList = msgList;
- this.mq = mq;
-
- this.failureTimes = new AtomicInteger(0);
- this.createMs = System.currentTimeMillis();
-
- this.latch = new CountDownLatch(1);
- this.isSuccess = false;
- }
-
- public AtomicInteger getFailureTimes() {
- return failureTimes;
- }
-
- public long getCreateMs() {
- return createMs;
- }
-
- public long getEmitMs() {
- return emitMs;
- }
-
- public void updateEmitMs() {
- this.emitMs = System.currentTimeMillis();
- }
-
- public List<MessageExt> getMsgList() {
- return msgList;
- }
-
-
- public MessageQueue getMq() {
- return mq;
- }
-
- public boolean waitFinish() throws InterruptedException {
- return latch.await(4, TimeUnit.HOURS);
- }
-
- public void done() {
- isSuccess = true;
- latch.countDown();
- }
-
- public void fail() {
- isSuccess = false;
- latch.countDown();
- }
-
- public boolean isSuccess() {
- return isSuccess;
- }
-
-
- @Override
- public String toString() {
- return ToStringBuilder.reflectionToString(this,
- ToStringStyle.SHORT_PREFIX_STYLE);
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e8f64d5e/jstorm-utility/jstorm-rocket-mq/src/main/java/com/alibaba/aloha/meta/example/TestTopology.java
----------------------------------------------------------------------
diff --git a/jstorm-utility/jstorm-rocket-mq/src/main/java/com/alibaba/aloha/meta/example/TestTopology.java b/jstorm-utility/jstorm-rocket-mq/src/main/java/com/alibaba/aloha/meta/example/TestTopology.java
deleted file mode 100644
index 608b54c..0000000
--- a/jstorm-utility/jstorm-rocket-mq/src/main/java/com/alibaba/aloha/meta/example/TestTopology.java
+++ /dev/null
@@ -1,150 +0,0 @@
-package com.alibaba.aloha.meta.example;
-
-import java.io.FileInputStream;
-import java.io.FileNotFoundException;
-import java.io.InputStream;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Properties;
-
-import org.apache.log4j.Logger;
-import org.yaml.snakeyaml.Yaml;
-
-import backtype.storm.Config;
-import backtype.storm.LocalCluster;
-import backtype.storm.StormSubmitter;
-import backtype.storm.topology.TopologyBuilder;
-
-import com.alibaba.aloha.meta.MetaSpout;
-import com.alibaba.jstorm.utils.JStormUtils;
-
-/**
- * MonitorTopology
- *
- * @author longda/zhiyuan.ls
- *
- */
-public class TestTopology {
-
- private static Logger LOG = Logger.getLogger(TestTopology.class);
-
- public static String WRITER_COMPONENT = "writer";
-
- public static void main(String[] args) throws Exception {
- if (args.length == 0) {
- System.err.println("Please input configuration file");
- System.exit(-1);
- }
-
- LoadConf(args[0]);
-
- TopologyBuilder builder = setupBuilder();
-
- submitTopology(builder);
-
- }
-
- private static TopologyBuilder setupBuilder() throws Exception {
- TopologyBuilder builder = new TopologyBuilder();
-
- int writerParallel = JStormUtils.parseInt(
- conf.get("topology.writer.parallel"), 1);
-
- int spoutParallel = JStormUtils.parseInt(
- conf.get("topology.spout.parallel"), 1);
-
- builder.setSpout("MetaSpout", new MetaSpout(), spoutParallel);
-
- builder.setBolt(WRITER_COMPONENT, new WriterBolt(), writerParallel)
- .shuffleGrouping("MetaSpout");
-
- return builder;
- }
-
- private static void submitTopology(TopologyBuilder builder) {
- try {
- if (local_mode(conf)) {
-
- LocalCluster cluster = new LocalCluster();
-
- cluster.submitTopology(
- String.valueOf(conf.get("topology.name")), conf,
- builder.createTopology());
-
- Thread.sleep(200000);
-
- cluster.shutdown();
- } else {
- StormSubmitter.submitTopology(
- String.valueOf(conf.get("topology.name")), conf,
- builder.createTopology());
- }
-
- } catch (Exception e) {
- LOG.error(e.getMessage(), e.getCause());
- }
- }
-
- private static Map conf = new HashMap<Object, Object>();
-
- private static void LoadProperty(String prop) {
- Properties properties = new Properties();
-
- try {
- InputStream stream = new FileInputStream(prop);
- properties.load(stream);
- } catch (FileNotFoundException e) {
- System.out.println("No such file " + prop);
- } catch (Exception e1) {
- e1.printStackTrace();
-
- return;
- }
-
- conf.putAll(properties);
- }
-
- private static void LoadYaml(String confPath) {
-
- Yaml yaml = new Yaml();
-
- try {
- InputStream stream = new FileInputStream(confPath);
-
- conf = (Map) yaml.load(stream);
- if (conf == null || conf.isEmpty() == true) {
- throw new RuntimeException("Failed to read config file");
- }
-
- } catch (FileNotFoundException e) {
- System.out.println("No such file " + confPath);
- throw new RuntimeException("No config file");
- } catch (Exception e1) {
- e1.printStackTrace();
- throw new RuntimeException("Failed to read config file");
- }
-
- return;
- }
-
- private static void LoadConf(String arg) {
- if (arg.endsWith("yaml")) {
- LoadYaml(arg);
- } else {
- LoadProperty(arg);
- }
- }
-
- public static boolean local_mode(Map conf) {
- String mode = (String) conf.get(Config.STORM_CLUSTER_MODE);
- if (mode != null) {
- if (mode.equals("local")) {
- return true;
- }
- }
-
- return false;
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e8f64d5e/jstorm-utility/jstorm-rocket-mq/src/main/java/com/alibaba/aloha/meta/example/WriterBolt.java
----------------------------------------------------------------------
diff --git a/jstorm-utility/jstorm-rocket-mq/src/main/java/com/alibaba/aloha/meta/example/WriterBolt.java b/jstorm-utility/jstorm-rocket-mq/src/main/java/com/alibaba/aloha/meta/example/WriterBolt.java
deleted file mode 100644
index 5eddef9..0000000
--- a/jstorm-utility/jstorm-rocket-mq/src/main/java/com/alibaba/aloha/meta/example/WriterBolt.java
+++ /dev/null
@@ -1,59 +0,0 @@
-package com.alibaba.aloha.meta.example;
-
-import java.util.Map;
-
-import org.apache.log4j.Logger;
-
-import backtype.storm.task.OutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.IRichBolt;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.tuple.Tuple;
-
-import com.alibaba.aloha.meta.MetaTuple;
-
-
-public class WriterBolt implements IRichBolt {
-
- private static final long serialVersionUID = 2495121976857546346L;
-
- private static final Logger LOG = Logger.getLogger(WriterBolt.class);
-
- protected OutputCollector collector;
-
- public void prepare(Map stormConf, TopologyContext context,
- OutputCollector collector) {
- this.collector = collector;
-
- }
-
- public void execute(Tuple tuple) {
- // TODO Auto-generated method stub
- MetaTuple metaTuple = (MetaTuple)tuple.getValue(0);
-
- try {
- LOG.info("Messages:" + metaTuple);
-
- } catch (Exception e) {
- collector.fail(tuple);
- return ;
- //throw new FailedException(e);
- }
-
- collector.ack(tuple);
- }
-
- public void cleanup() {
- // TODO Auto-generated method stub
- }
-
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
- // TODO Auto-generated method stub
-
- }
-
- public Map<String, Object> getComponentConfiguration() {
- // TODO Auto-generated method stub
- return null;
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e8f64d5e/jstorm-utility/jstorm-rocket-mq/target/classes/META-INF/MANIFEST.MF
----------------------------------------------------------------------
diff --git a/jstorm-utility/jstorm-rocket-mq/target/classes/META-INF/MANIFEST.MF b/jstorm-utility/jstorm-rocket-mq/target/classes/META-INF/MANIFEST.MF
deleted file mode 100644
index 458dd09..0000000
--- a/jstorm-utility/jstorm-rocket-mq/target/classes/META-INF/MANIFEST.MF
+++ /dev/null
@@ -1,5 +0,0 @@
-Manifest-Version: 1.0
-Built-By: basti.lj
-Build-Jdk: 1.6.0_45
-Created-By: Maven Integration for Eclipse
-
http://git-wip-us.apache.org/repos/asf/storm/blob/e8f64d5e/jstorm-utility/jstorm-rocket-mq/target/classes/META-INF/maven/com.alibaba.jstorm/metaspout/pom.properties
----------------------------------------------------------------------
diff --git a/jstorm-utility/jstorm-rocket-mq/target/classes/META-INF/maven/com.alibaba.jstorm/metaspout/pom.properties b/jstorm-utility/jstorm-rocket-mq/target/classes/META-INF/maven/com.alibaba.jstorm/metaspout/pom.properties
deleted file mode 100644
index ad01a91..0000000
--- a/jstorm-utility/jstorm-rocket-mq/target/classes/META-INF/maven/com.alibaba.jstorm/metaspout/pom.properties
+++ /dev/null
@@ -1,7 +0,0 @@
-#Generated by Maven Integration for Eclipse
-#Mon Nov 03 15:24:06 CST 2014
-version=0.2.0-SNAPSHOT
-groupId=com.alibaba.jstorm
-m2e.projectName=metaspout
-m2e.projectLocation=D\:\\code\\aloha_branch\\github_master\\jstorm\\jstorm-utility\\jstorm-rocket-mq
-artifactId=metaspout
http://git-wip-us.apache.org/repos/asf/storm/blob/e8f64d5e/jstorm-utility/jstorm-rocket-mq/target/classes/META-INF/maven/com.alibaba.jstorm/metaspout/pom.xml
----------------------------------------------------------------------
diff --git a/jstorm-utility/jstorm-rocket-mq/target/classes/META-INF/maven/com.alibaba.jstorm/metaspout/pom.xml b/jstorm-utility/jstorm-rocket-mq/target/classes/META-INF/maven/com.alibaba.jstorm/metaspout/pom.xml
deleted file mode 100644
index ffc611e..0000000
--- a/jstorm-utility/jstorm-rocket-mq/target/classes/META-INF/maven/com.alibaba.jstorm/metaspout/pom.xml
+++ /dev/null
@@ -1,94 +0,0 @@
-<?xml version="1.0"?>
-<project xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd" xmlns="http://maven.apache.org/POM/4.0.0"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
- <modelVersion>4.0.0</modelVersion>
- <groupId>com.alibaba.jstorm</groupId>
- <artifactId>metaspout</artifactId>
- <version>0.2.0-SNAPSHOT</version>
-
- <properties>
- <jstorm.version>0.9.6.1</jstorm.version>
- </properties>
-
- <build>
- <plugins>
- <plugin>
- <artifactId>maven-assembly-plugin</artifactId>
- <configuration>
- <descriptorRefs>
- <descriptorRef>jar-with-dependencies</descriptorRef>
- </descriptorRefs>
- </configuration>
- <executions>
- <execution>
- <id>make-assembly</id>
- <phase>package</phase>
- <goals>
- <goal>single</goal>
- </goals>
- </execution>
- </executions>
- </plugin>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-compiler-plugin</artifactId>
- <configuration>
- <source>1.6</source>
- <target>1.6</target>
- </configuration>
- </plugin>
- </plugins>
- </build>
-
- <dependencies>
- <dependency>
- <groupId>com.alibaba.jstorm</groupId>
- <artifactId>jstorm-client</artifactId>
- <version>${jstorm.version}</version>
- <scope>provided</scope>
- </dependency>
- <dependency>
- <groupId>com.alibaba.jstorm</groupId>
- <artifactId>jstorm-client-extension</artifactId>
- <version>${jstorm.version}</version>
- <scope>provided</scope>
- </dependency>
- <dependency>
- <groupId>com.alibaba.jstorm</groupId>
- <artifactId>jstorm-server</artifactId>
- <version>${jstorm.version}</version>
- <scope>provided</scope>
- </dependency>
-
- <dependency>
- <groupId>junit</groupId>
- <artifactId>junit</artifactId>
- <version>4.4</version>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>com.taobao.metaq.final</groupId>
- <artifactId>metaq-client</artifactId>
- <version>3.1.8</version>
- </dependency>
- <!--
- <dependency>
- <groupId>com.alibaba.rocketmq</groupId>
- <artifactId>rocketmq-common</artifactId>
- <version>3.0.1</version>
- </dependency>
- <dependency>
- <groupId>com.alibaba.rocketmq</groupId>
- <artifactId>rocketmq-client</artifactId>
- <version>3.0.1</version>
- </dependency>
- <dependency>
- <groupId>com.alibaba.rocketmq</groupId>
- <artifactId>rocketmq-remoting</artifactId>
- <version>3.0.1</version>
- </dependency>
- -->
-
- </dependencies>
-
-</project>
http://git-wip-us.apache.org/repos/asf/storm/blob/e8f64d5e/jstorm-utility/jstorm-rocket-mq/test/main/resources/metaspout.yaml
----------------------------------------------------------------------
diff --git a/jstorm-utility/jstorm-rocket-mq/test/main/resources/metaspout.yaml b/jstorm-utility/jstorm-rocket-mq/test/main/resources/metaspout.yaml
deleted file mode 100644
index f007772..0000000
--- a/jstorm-utility/jstorm-rocket-mq/test/main/resources/metaspout.yaml
+++ /dev/null
@@ -1,32 +0,0 @@
-
-#Meta Client Configuration
-# Please refer MetaClientConfig for every setting's details
-meta.topic: "bbl_user"
-meta.consumer.group: "bbl_user"
-meta.subexpress: "*"
-#meta.nameserver: ""
-#meta.pull.interval.ms: 0
-#meta.max.fail.times: 5
-#meta.internal.queue.size: 256
-#meta.batch.send.msg.size: 16
-#meta.batch.pull.msg.size: 32
-#meta.pull.thread.num: 4
-#meta.spout.auto.ack: false
-#meta.spout.flow.contro: true
-#yyyyMMddHHmmss
-meta.consumer.start.timestamp: "20141011000000"
-#meta.extra.properties:
-
-topology.name: test_meta_spout
-topology.version: 1.0.0
-topology.workers: 5
-topology.max.spout.pending: 10
-topology.acker.executors: 1
-
-topology.debug: false
-topology.debug.recv.tuple: false
-storm.cluster.mode: local
-
-topology.spout.parallel: 2
-topology.writer.parallel: 1
-
http://git-wip-us.apache.org/repos/asf/storm/blob/e8f64d5e/jstorm-utility/ons/conf/ons.yaml
----------------------------------------------------------------------
diff --git a/jstorm-utility/ons/conf/ons.yaml b/jstorm-utility/ons/conf/ons.yaml
deleted file mode 100644
index 5736422..0000000
--- a/jstorm-utility/ons/conf/ons.yaml
+++ /dev/null
@@ -1,49 +0,0 @@
-#############################################################
-###################### ONS Setting Begin ####################
-Topic: "longdatest"
-SubExpress: "*"
-AccessKey: null
-SecretKey: null
-
-ConsumerId: "CID-LONGDA-123"
-ConsumeThreadNums: 4
-ProducerId: "PID_25770293805-101"
-
-#SendMsgTimeoutMillis:
-#MessageModel:
-#ONSAddr:
-#NAMESRV_ADDR:
-
-###################### ONS Setting End ######################
-#############################################################
-
-
-#############################################################
-############### JStorm Topology Setting Begin ###############
-
-#ons spout enable flow control setting
-# all message will be sent in Spout.nextTuple
-OnsSpoutFlowControl: true
-
-# spout consume one with autoAck mode
-# if disable, consumer offset won't move on until do spout.ack
-OnsSpoutAutoAck: false
-
-# if one message fail times is bigger than the OnsMsgMaxFailTimes
-# it will be thrown
-OnsMsgMaxFailTimes: 5
-
-topology.name: "ons_test"
-topology.consumer.parallel: 1
-topology.producer.parallel: 1
-worker.memory.size: 2147483648
-topology.workers: 1
-topology.acker.executors: 0
-storm.cluster.mode: "local"
-
-############### JStorm Topology Setting End ###############
-#############################################################
-
-
-
-