You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@storm.apache.org by MapleFalling <ma...@gmail.com> on 2014/01/12 09:41:37 UTC
Anyone can help check my application to do the url count statistic
Hi,
I am trying to use storm trident to implement a function to calculate the
big size url data to do the url count and save into mongoDB, I use redis to
pump in data, and write up a redistranactionalspout to get data into storm.
Since I need 'exactly once' semantic, I use trident. But when I submit my
topology to storm, I found two workers fetch data from redis, one is for
$mastercoord-bg0<http://10.5.140.132:8080/topology/tridentWCount-46-1389489118/component/%24mastercoord-bg0>,
another is $spoutcoord-spout0<http://10.5.140.132:8080/topology/tridentWCount-46-1389489118/component/%24spoutcoord-spout0>
.
I do not quite understand why there are two spout to fetch data from redis.
It caused my calculation is not right.
I copy paste my code here, hope someone can check it out for me.
public class RedisPubSubTransactionalSpout implements ITridentSpout<String>{
static final long serialVersionUID = 737015318988609461L;
static Logger LOG = Logger.getLogger(RedisPubSubTransactionalSpout.class);
static final String REDIS_EMITTEDMSG="emittedMsgs";
SpoutOutputCollector _collector;
final String host;
final int port;
final String pattern;
LinkedBlockingQueue<String> queue;
JedisPool pool;
private final String singleOutputFieldName;
private Long count2storm=0L;
private Long countFromRedis=0L;
public RedisPubSubTransactionalSpout(String host, int port, String
pattern,String singleOutputFieldName) {
this.host = host;
this.port = port;
this.pattern = pattern;
this.singleOutputFieldName=singleOutputFieldName;
this.getLogger().debug("Set up RedisClient");
}
private Logger getLogger(){
LOG.setLevel(Level.ALL);
return LOG;
}
protected void initIfNeeded() {
if (pool==null) {
queue = new LinkedBlockingQueue<String>(1000);
pool = new JedisPool(new JedisPoolConfig(),host,port);
ListenerThread listener = new
ListenerThread(queue,pool,pattern);
listener.start();
getLogger().debug("Start redis reader process");
}
}
class ListenerThread extends Thread {
LinkedBlockingQueue<String> queue;
JedisPool pool;
String pattern;
public ListenerThread(LinkedBlockingQueue<String> queue, JedisPool pool,
String pattern) {
this.queue = queue;
this.pool = pool;
this.pattern = pattern;
}
public void run() {
JedisPubSub listener = new JedisPubSub() {
@Override
public void onMessage(String channel, String message) {
// getLogger().debug("redis spout get message from redis
"+message);
queue.offer(message);
countFromRedis++;
getLogger().debug("on message,total from
redis:"+countFromRedis.toString());
}
@Override
public void onPMessage(String pattern, String channel, String message) {
// getLogger().debug("redis spout get pmessage from
redis "+message);
queue.offer(message);
countFromRedis++;
getLogger().debug("on pmessage,total from
redis:"+countFromRedis.toString());
}
@Override
public void onPSubscribe(String channel, int subscribedChannels) {
// TODO Auto-generated method stub
}
@Override
public void onPUnsubscribe(String channel, int subscribedChannels) {
// TODO Auto-generated method stub
}
@Override
public void onSubscribe(String channel, int subscribedChannels) {
// TODO Auto-generated method stub
}
@Override
public void onUnsubscribe(String channel, int subscribedChannels) {
// TODO Auto-generated method stub
}
};
Jedis jedis = pool.getResource();
try {
//jedis.psubscribe(listener, pattern);
jedis.subscribe(listener,pattern);
} finally {
pool.returnResource(jedis);
}
}
};
private class Coordinator implements BatchCoordinator<String>{
private final long batchSize;
public Coordinator(long batchSize){
this.batchSize=batchSize;
initIfNeeded();
}
@Override
public String initializeTransaction(long txid, String prevMetaData){
String emittedId=UUID.randomUUID().toString();
Jedis jedisClient=null;
StringBuffer emittedMsg=new StringBuffer();
Long txId=txid;
try{
jedisClient=pool.getResource();
getLogger().debug("init transaction!");
Long _count=0L;
for (int idx=0;idx<batchSize;idx++){
String ret = queue.poll();
if(ret==null) {
Utils.sleep(50);
} else {
getLogger().debug("coordinator get message from
redis "+ret);
emittedMsg.append(ret).append("\n");
_count++;
}
}
String _emitMsgStr=emittedMsg.toString();
getLogger().debug("emit Msg in coordinator:"+_emitMsgStr);
if ((jedisClient!=null) && (_emitMsgStr!=null) &&
(!_emitMsgStr.isEmpty())){
getLogger().debug("Put emit message into redis with
txId:"+txId.toString());
jedisClient.hset(REDIS_EMITTEDMSG,txId.toString(),emittedMsg.toString());
}
//save into mongoDB
/* BasicDBObject _total = (BasicDBObject)
MongoDB.INSTANCE.getMongoDBCol2().findOne();
if (_total==null){
_total =new BasicDBObject("t",0);
}
Long count=(Long) _total.get("t");
if (count==0L || count== null){
count=1L;
}else{
count=count+_count;
}
_total.put("t",count);
MongoDB.INSTANCE.getMongoDBCol2().save(_total); */
count2storm=count2storm+_count;
getLogger().debug("total to storm:"+count2storm.toString());
}catch(Exception e){
getLogger().debug("error in init
transaction!"+e.getMessage());
}finally {
if (jedisClient!=null)
pool.returnResource(jedisClient);
return emittedId;
}
}
@Override
public void success(long txid){
/* Long txId=txid;
Jedis jedisClient=null;
try{
jedisClient=pool.getResource();
//remove from successful handled message
if (jedisClient!=null){
getLogger().debug("remove transaction
:"+txId.toString());
jedisClient.hdel(REDIS_EMITTEDMSG,txId.toString());
}
}catch(Exception e){
}finally {
if (jedisClient!=null)
pool.returnResource(jedisClient);
}*/
}
@Override
public boolean isReady(long txid){
if (pool!=null){
return true;
}else{
return false;
}
}
@Override
public void close(){
pool.destroy();
}
}
private class TheEmitter implements Emitter<String> {
@Override
public void emitBatch(TransactionAttempt tx, String
coordinatorMeta, TridentCollector collector) {
Jedis jedisClient=null;
initIfNeeded();
try{
jedisClient=pool.getResource();
String
msgData=jedisClient.hget(REDIS_EMITTEDMSG,tx.getTransactionId().toString());
getLogger().debug("emitter get message from redis
"+msgData);
if (msgData!=null){
String msgs[]=msgData.split("\n");
for (String msg:msgs){
getLogger().debug("get message from emittedMsgs
:"+msg);
collector.emit(new Values(msg));
}
}
}catch (Exception e){
getLogger().debug("Emit message error"+e.getMessage());
}finally {
if (null != jedisClient)
pool.returnResource(jedisClient);
}
}
@Override
public void success(TransactionAttempt tx) {
// NOP
Long txId=tx.getTransactionId();
initIfNeeded();
Jedis jedisClient=null;
try{
jedisClient=pool.getResource();
//remove from successful handled message
if (jedisClient!=null){
getLogger().debug("remove transaction
:"+txId.toString());
jedisClient.hdel(REDIS_EMITTEDMSG,txId.toString());
}
}catch(Exception e){
}finally {
if (jedisClient!=null)
pool.returnResource(jedisClient);
}
}
@Override
public void close() {
// NOP
}
}
@Override
public BatchCoordinator<String> getCoordinator(String txStateId, Map
conf, TopologyContext context) {
return new Coordinator((Long) 100L);
}
@Override
public Emitter<String> getEmitter(String txStateId, Map conf,
TopologyContext context) {
return new TheEmitter();
}
@Override
public Map getComponentConfiguration() {
return null;
}
@Override
public Fields getOutputFields() {
return new Fields(singleOutputFieldName);
}
}
public class TridentURLCount {
public static final class Constants {
public static final String RedisHost = "abc.abc.abc.abc";
public static final int RedisPort = 6379;
public static final String Pattern = "stormURL";
public static final String OutputFieldName = "url";
public static final String urlHash = "h";
public static final String url = "u";
public static final String urlCount = "c";
public static final String aveTTL = "at";
public static final String txId="tid";
}
public static class URLStat implements Serializable {
private String hash;
public Long getCount() {
return count;
}
public String getUrl() {
return url;
}
public void setUrl(String url) {
this.url = url;
}
public Long getAvgTTL() {
return this.avgTTL;
}
public void mergeAvg(Long avgTTL,Long count) {
this.avgTTL = ((this.avgTTL * this.count) + avgTTL*count) /
(this.count + count);
this.count=this.count+count;
}
private String url;
public String getHash() {
return hash;
}
public void setHash(String hash) {
this.hash = hash;
}
private Long avgTTL = 0L;
private Long count = 0L;
}
public static class SumDB implements State {
private Long txId;
private static final Logger LOG = Logger.getLogger(SumDB.class);
{
LOG.setLevel(Level.ALL);
}
private Logger getLogger() {
return LOG;
}
public void beginCommit(Long txid) {
this.txId = txid;
}
public void commit(Long txid) {
}
public void update(Map<String, URLStat> data) {
MongoDB.INSTANCE.setupMongoDB();
this.getLogger().debug("Start to save data to MongoDB, here we
need to make exact process once semantics");
Iterator it = data.entrySet().iterator();
while (it.hasNext()) {
Map.Entry entry = (Map.Entry) it.next();
String key = (String) entry.getKey();
URLStat val = (URLStat) entry.getValue();
BasicDBObject stormStat = new
BasicDBObject(Constants.urlHash, key);
BasicDBObject _dbStat = (BasicDBObject)
MongoDB.INSTANCE.getMongoDBCol().findOne(stormStat);
if (_dbStat == null) {
_dbStat = stormStat;
}
Long _txId=(Long) _dbStat.get(Constants.txId);
if (_txId==null || _txId <this.txId){
Long _count=(Long) _dbStat.get(Constants.urlCount);
if (_count==null)
_count=0L;
_count=_count+val.getCount();
_dbStat.put(Constants.url, val.getUrl());
_dbStat.put(Constants.urlCount, _count);
_dbStat.put(Constants.aveTTL, val.getAvgTTL());
_dbStat.put(Constants.txId, this.txId);
MongoDB.INSTANCE.getMongoDBCol().save(_dbStat);
this.getLogger().debug("Save stat into mongoDB for url
" + val.getUrl());
}
}
}
}
public static class SumDBFactory implements StateFactory {
public State makeState(Map a_conf, IMetricsContext a_context,
int a_partitionIndex, int a_numPartitions) {
return new SumDB();
}
}
public static class SumUpdater extends BaseStateUpdater<SumDB> {
public void updateState(SumDB state, List<TridentTuple> tuples,
TridentCollector collector) {
Map<String, URLStat> counts = new HashMap<String,
URLStat>();
for (TridentTuple tuple : tuples) {
URLStat _obj=(URLStat) tuple.getValue(1);
counts.put(_obj.getHash(), _obj);
}
state.update(counts);
}
}
public static class localCombiner implements Aggregator<Map<String,
URLStat>> {
int partitionId;
private static final Logger LOG =
Logger.getLogger(localCombiner.class);
private Logger getLogger() {
LOG.setLevel(Level.ALL);
return LOG;
}
@SuppressWarnings("rawtypes")
@Override
public void prepare(Map conf, TridentOperationContext context) {
this.partitionId = context.getPartitionIndex();
}
@Override
public void cleanup() {
}
@Override
public Map<String, URLStat> init(Object batchId,
TridentCollector collector) {
return new HashMap<String, URLStat>();
}
@Override
public void aggregate(Map<String, URLStat> val, TridentTuple
tuple, TridentCollector collector) {
String dataStr = (String) tuple.getString(0);
this.getLogger().debug("Get Data from source:" + dataStr);
if (dataStr != null && !dataStr.isEmpty()) {
String[] dataArr = dataStr.split(",");
String urlHash = dataArr[0];
String url = dataArr[1];
Long ttl = Long.parseLong(dataArr[5], 10);
URLStat _obj = val.get(urlHash);
if (_obj == null) {
_obj = new URLStat();
_obj.setHash(urlHash);
_obj.setUrl(url);
}
_obj.mergeAvg(ttl,1L);
val.put(_obj.getHash(), _obj);
}
}
@Override
public void complete(Map<String, URLStat> val, TridentCollector
collector) {
this.getLogger().debug("In localCombiner complete");
Iterator it = val.entrySet().iterator();
while (it.hasNext()) {
Map.Entry entry = (Map.Entry) it.next();
String key = (String) entry.getKey();
URLStat _val = (URLStat) entry.getValue();
this.getLogger().debug("emit??????????????:"+_val.getHash().toString());
collector.emit(new Values(key, _val));
}
}
}
public static class localCombiner2 implements Aggregator<Map<String,
URLStat>> {
int partitionId;
private static final Logger LOG =
Logger.getLogger(localCombiner2.class);
private Logger getLogger() {
LOG.setLevel(Level.ALL);
return LOG;
}
@SuppressWarnings("rawtypes")
@Override
public void prepare(Map conf, TridentOperationContext context) {
this.partitionId = context.getPartitionIndex();
}
@Override
public void cleanup() {
}
@Override
public Map<String, URLStat> init(Object batchId, TridentCollector
collector) {
return new HashMap<String, URLStat>();
}
@Override
public void aggregate(Map<String, URLStat> val, TridentTuple tuple,
TridentCollector collector) {
this.getLogger().debug("In localCombiner2 aggregate");
String urlHash = tuple.getString(0);
URLStat _tupleStat=(URLStat) tuple.getValue(1);
String url =_tupleStat.getUrl() ;
URLStat _obj = val.get(urlHash);
if (_obj == null) {
_obj = new URLStat();
_obj.setHash(urlHash);
_obj.setUrl(url);
}
_obj.mergeAvg(_tupleStat.getAvgTTL(),_tupleStat.getCount());
val.put(_obj.getHash(), _obj);
}
@Override
public void complete(Map<String, URLStat> val, TridentCollector
collector) {
Iterator it = val.entrySet().iterator();
this.getLogger().debug("In localCombiner2
complete!!!!!!!!!!!!!!!");
while (it.hasNext()) {
Map.Entry entry = (Map.Entry) it.next();
String key = (String) entry.getKey();
URLStat _val = (URLStat) entry.getValue();
collector.emit(new Values(key, _val));
this.getLogger().debug("In localCombiner2
emit!:"+key.toString());
}
}
}
public static StormTopology buildTopology(LocalDRPC drpc) {
RedisPubSubTransactionalSpout spout = new
RedisPubSubTransactionalSpout(Constants.RedisHost,
Constants.RedisPort,
Constants.Pattern,
Constants.OutputFieldName);
TridentTopology topology = new TridentTopology();
TridentState wordCounts = topology.newStream("spout1",
spout).shuffle()
.aggregate(new Fields(Constants.OutputFieldName),
new localCombiner(), new Fields("hash", "urlstat"))
.parallelismHint(16).groupBy(new Fields("hash"))
.aggregate(new Fields("hash","urlstat"), new
localCombiner2(), new Fields("hash1","urlstat1")).parallelismHint(16)
.partitionPersist(new SumDBFactory(), new Fields("hash1",
"urlstat1"), new SumUpdater()).parallelismHint(1);
return topology.build();
}
public static void main(String[] args) throws Exception {
Config conf = new Config();
conf.setMaxSpoutPending(20);
conf.setDebug(true);
conf.put(Config.NIMBUS_HOST, "10.5.140.132");
ArrayList<String> zookeeperSrvrs=new ArrayList<String>();
zookeeperSrvrs.add("10.5.140.131");
conf.put(Config.STORM_ZOOKEEPER_SERVERS, zookeeperSrvrs);
//Config.setMaxSpoutPending(conf,5000);
conf.put(Config.TOPOLOGY_DEBUG,true);
conf.put(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS,120);
if (args.length == 0) {
LocalDRPC drpc = new LocalDRPC();
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("wordCounter", conf,
buildTopology(drpc));
for (int i = 0; i < 100; i++) {
System.out.println("DRPC RESULT: " + drpc.execute("words",
"cat the dog jumped"));
Thread.sleep(1000);
}
} else {
conf.setNumWorkers(6);
StormSubmitter.submitTopology(args[0], conf,
buildTopology(null));
}
}
}
--
Maplefalling
Fwd: Anyone can help check my application to do the url count statistic
Posted by MapleFalling <ma...@gmail.com>.
Hi,
I am trying to use storm trident to implement a function to calculate the
big size url data to do the url count and save into mongoDB, I use redis to
pump in data, and write up a redistranactionalspout to get data into storm.
Since I need 'exactly once' semantic, I use trident. But when I submit my
topology to storm, I found two workers fetch data from redis, one is for
$mastercoord-bg0<http://10.5.140.132:8080/topology/tridentWCount-46-1389489118/component/%24mastercoord-bg0>,
another is $spoutcoord-spout0<http://10.5.140.132:8080/topology/tridentWCount-46-1389489118/component/%24spoutcoord-spout0>
.
I do not quite understand why there are two spout to fetch data from redis.
It caused my calculation is not right.
I copy paste my code here, hope someone can check it out for me.
public class RedisPubSubTransactionalSpout implements ITridentSpout<String>{
static final long serialVersionUID = 737015318988609461L;
static Logger LOG = Logger.getLogger(RedisPubSubTransactionalSpout.class);
static final String REDIS_EMITTEDMSG="emittedMsgs";
SpoutOutputCollector _collector;
final String host;
final int port;
final String pattern;
LinkedBlockingQueue<String> queue;
JedisPool pool;
private final String singleOutputFieldName;
private Long count2storm=0L;
private Long countFromRedis=0L;
public RedisPubSubTransactionalSpout(String host, int port, String
pattern,String singleOutputFieldName) {
this.host = host;
this.port = port;
this.pattern = pattern;
this.singleOutputFieldName=singleOutputFieldName;
this.getLogger().debug("Set up RedisClient");
}
private Logger getLogger(){
LOG.setLevel(Level.ALL);
return LOG;
}
protected void initIfNeeded() {
if (pool==null) {
queue = new LinkedBlockingQueue<String>(1000);
pool = new JedisPool(new JedisPoolConfig(),host,port);
ListenerThread listener = new
ListenerThread(queue,pool,pattern);
listener.start();
getLogger().debug("Start redis reader process");
}
}
class ListenerThread extends Thread {
LinkedBlockingQueue<String> queue;
JedisPool pool;
String pattern;
public ListenerThread(LinkedBlockingQueue<String> queue, JedisPool pool,
String pattern) {
this.queue = queue;
this.pool = pool;
this.pattern = pattern;
}
public void run() {
JedisPubSub listener = new JedisPubSub() {
@Override
public void onMessage(String channel, String message) {
// getLogger().debug("redis spout get message from redis
"+message);
queue.offer(message);
countFromRedis++;
getLogger().debug("on message,total from
redis:"+countFromRedis.toString());
}
@Override
public void onPMessage(String pattern, String channel, String message) {
// getLogger().debug("redis spout get pmessage from
redis "+message);
queue.offer(message);
countFromRedis++;
getLogger().debug("on pmessage,total from
redis:"+countFromRedis.toString());
}
@Override
public void onPSubscribe(String channel, int subscribedChannels) {
// TODO Auto-generated method stub
}
@Override
public void onPUnsubscribe(String channel, int subscribedChannels) {
// TODO Auto-generated method stub
}
@Override
public void onSubscribe(String channel, int subscribedChannels) {
// TODO Auto-generated method stub
}
@Override
public void onUnsubscribe(String channel, int subscribedChannels) {
// TODO Auto-generated method stub
}
};
Jedis jedis = pool.getResource();
try {
//jedis.psubscribe(listener, pattern);
jedis.subscribe(listener,pattern);
} finally {
pool.returnResource(jedis);
}
}
};
private class Coordinator implements BatchCoordinator<String>{
private final long batchSize;
public Coordinator(long batchSize){
this.batchSize=batchSize;
initIfNeeded();
}
@Override
public String initializeTransaction(long txid, String prevMetaData){
String emittedId=UUID.randomUUID().toString();
Jedis jedisClient=null;
StringBuffer emittedMsg=new StringBuffer();
Long txId=txid;
try{
jedisClient=pool.getResource();
getLogger().debug("init transaction!");
Long _count=0L;
for (int idx=0;idx<batchSize;idx++){
String ret = queue.poll();
if(ret==null) {
Utils.sleep(50);
} else {
getLogger().debug("coordinator get message from
redis "+ret);
emittedMsg.append(ret).append("\n");
_count++;
}
}
String _emitMsgStr=emittedMsg.toString();
getLogger().debug("emit Msg in coordinator:"+_emitMsgStr);
if ((jedisClient!=null) && (_emitMsgStr!=null) &&
(!_emitMsgStr.isEmpty())){
getLogger().debug("Put emit message into redis with
txId:"+txId.toString());
jedisClient.hset(REDIS_EMITTEDMSG,txId.toString(),emittedMsg.toString());
}
//save into mongoDB
/* BasicDBObject _total = (BasicDBObject)
MongoDB.INSTANCE.getMongoDBCol2().findOne();
if (_total==null){
_total =new BasicDBObject("t",0);
}
Long count=(Long) _total.get("t");
if (count==0L || count== null){
count=1L;
}else{
count=count+_count;
}
_total.put("t",count);
MongoDB.INSTANCE.getMongoDBCol2().save(_total); */
count2storm=count2storm+_count;
getLogger().debug("total to storm:"+count2storm.toString());
}catch(Exception e){
getLogger().debug("error in init
transaction!"+e.getMessage());
}finally {
if (jedisClient!=null)
pool.returnResource(jedisClient);
return emittedId;
}
}
@Override
public void success(long txid){
/* Long txId=txid;
Jedis jedisClient=null;
try{
jedisClient=pool.getResource();
//remove from successful handled message
if (jedisClient!=null){
getLogger().debug("remove transaction
:"+txId.toString());
jedisClient.hdel(REDIS_EMITTEDMSG,txId.toString());
}
}catch(Exception e){
}finally {
if (jedisClient!=null)
pool.returnResource(jedisClient);
}*/
}
@Override
public boolean isReady(long txid){
if (pool!=null){
return true;
}else{
return false;
}
}
@Override
public void close(){
pool.destroy();
}
}
private class TheEmitter implements Emitter<String> {
@Override
public void emitBatch(TransactionAttempt tx, String
coordinatorMeta, TridentCollector collector) {
Jedis jedisClient=null;
initIfNeeded();
try{
jedisClient=pool.getResource();
String
msgData=jedisClient.hget(REDIS_EMITTEDMSG,tx.getTransactionId().toString());
getLogger().debug("emitter get message from redis
"+msgData);
if (msgData!=null){
String msgs[]=msgData.split("\n");
for (String msg:msgs){
getLogger().debug("get message from emittedMsgs
:"+msg);
collector.emit(new Values(msg));
}
}
}catch (Exception e){
getLogger().debug("Emit message error"+e.getMessage());
}finally {
if (null != jedisClient)
pool.returnResource(jedisClient);
}
}
@Override
public void success(TransactionAttempt tx) {
// NOP
Long txId=tx.getTransactionId();
initIfNeeded();
Jedis jedisClient=null;
try{
jedisClient=pool.getResource();
//remove from successful handled message
if (jedisClient!=null){
getLogger().debug("remove transaction
:"+txId.toString());
jedisClient.hdel(REDIS_EMITTEDMSG,txId.toString());
}
}catch(Exception e){
}finally {
if (jedisClient!=null)
pool.returnResource(jedisClient);
}
}
@Override
public void close() {
// NOP
}
}
@Override
public BatchCoordinator<String> getCoordinator(String txStateId, Map
conf, TopologyContext context) {
return new Coordinator((Long) 100L);
}
@Override
public Emitter<String> getEmitter(String txStateId, Map conf,
TopologyContext context) {
return new TheEmitter();
}
@Override
public Map getComponentConfiguration() {
return null;
}
@Override
public Fields getOutputFields() {
return new Fields(singleOutputFieldName);
}
}
public class TridentURLCount {
public static final class Constants {
public static final String RedisHost = "abc.abc.abc.abc";
public static final int RedisPort = 6379;
public static final String Pattern = "stormURL";
public static final String OutputFieldName = "url";
public static final String urlHash = "h";
public static final String url = "u";
public static final String urlCount = "c";
public static final String aveTTL = "at";
public static final String txId="tid";
}
public static class URLStat implements Serializable {
private String hash;
public Long getCount() {
return count;
}
public String getUrl() {
return url;
}
public void setUrl(String url) {
this.url = url;
}
public Long getAvgTTL() {
return this.avgTTL;
}
public void mergeAvg(Long avgTTL,Long count) {
this.avgTTL = ((this.avgTTL * this.count) + avgTTL*count) /
(this.count + count);
this.count=this.count+count;
}
private String url;
public String getHash() {
return hash;
}
public void setHash(String hash) {
this.hash = hash;
}
private Long avgTTL = 0L;
private Long count = 0L;
}
public static class SumDB implements State {
private Long txId;
private static final Logger LOG = Logger.getLogger(SumDB.class);
{
LOG.setLevel(Level.ALL);
}
private Logger getLogger() {
return LOG;
}
public void beginCommit(Long txid) {
this.txId = txid;
}
public void commit(Long txid) {
}
public void update(Map<String, URLStat> data) {
MongoDB.INSTANCE.setupMongoDB();
this.getLogger().debug("Start to save data to MongoDB, here we
need to make exact process once semantics");
Iterator it = data.entrySet().iterator();
while (it.hasNext()) {
Map.Entry entry = (Map.Entry) it.next();
String key = (String) entry.getKey();
URLStat val = (URLStat) entry.getValue();
BasicDBObject stormStat = new
BasicDBObject(Constants.urlHash, key);
BasicDBObject _dbStat = (BasicDBObject)
MongoDB.INSTANCE.getMongoDBCol().findOne(stormStat);
if (_dbStat == null) {
_dbStat = stormStat;
}
Long _txId=(Long) _dbStat.get(Constants.txId);
if (_txId==null || _txId <this.txId){
Long _count=(Long) _dbStat.get(Constants.urlCount);
if (_count==null)
_count=0L;
_count=_count+val.getCount();
_dbStat.put(Constants.url, val.getUrl());
_dbStat.put(Constants.urlCount, _count);
_dbStat.put(Constants.aveTTL, val.getAvgTTL());
_dbStat.put(Constants.txId, this.txId);
MongoDB.INSTANCE.getMongoDBCol().save(_dbStat);
this.getLogger().debug("Save stat into mongoDB for url
" + val.getUrl());
}
}
}
}
public static class SumDBFactory implements StateFactory {
public State makeState(Map a_conf, IMetricsContext a_context,
int a_partitionIndex, int a_numPartitions) {
return new SumDB();
}
}
public static class SumUpdater extends BaseStateUpdater<SumDB> {
public void updateState(SumDB state, List<TridentTuple> tuples,
TridentCollector collector) {
Map<String, URLStat> counts = new HashMap<String,
URLStat>();
for (TridentTuple tuple : tuples) {
URLStat _obj=(URLStat) tuple.getValue(1);
counts.put(_obj.getHash(), _obj);
}
state.update(counts);
}
}
public static class localCombiner implements Aggregator<Map<String,
URLStat>> {
int partitionId;
private static final Logger LOG =
Logger.getLogger(localCombiner.class);
private Logger getLogger() {
LOG.setLevel(Level.ALL);
return LOG;
}
@SuppressWarnings("rawtypes")
@Override
public void prepare(Map conf, TridentOperationContext context) {
this.partitionId = context.getPartitionIndex();
}
@Override
public void cleanup() {
}
@Override
public Map<String, URLStat> init(Object batchId,
TridentCollector collector) {
return new HashMap<String, URLStat>();
}
@Override
public void aggregate(Map<String, URLStat> val, TridentTuple
tuple, TridentCollector collector) {
String dataStr = (String) tuple.getString(0);
this.getLogger().debug("Get Data from source:" + dataStr);
if (dataStr != null && !dataStr.isEmpty()) {
String[] dataArr = dataStr.split(",");
String urlHash = dataArr[0];
String url = dataArr[1];
Long ttl = Long.parseLong(dataArr[5], 10);
URLStat _obj = val.get(urlHash);
if (_obj == null) {
_obj = new URLStat();
_obj.setHash(urlHash);
_obj.setUrl(url);
}
_obj.mergeAvg(ttl,1L);
val.put(_obj.getHash(), _obj);
}
}
@Override
public void complete(Map<String, URLStat> val, TridentCollector
collector) {
this.getLogger().debug("In localCombiner complete");
Iterator it = val.entrySet().iterator();
while (it.hasNext()) {
Map.Entry entry = (Map.Entry) it.next();
String key = (String) entry.getKey();
URLStat _val = (URLStat) entry.getValue();
this.getLogger().debug("emit??????????????:"+_val.getHash().toString());
collector.emit(new Values(key, _val));
}
}
}
public static class localCombiner2 implements Aggregator<Map<String,
URLStat>> {
int partitionId;
private static final Logger LOG =
Logger.getLogger(localCombiner2.class);
private Logger getLogger() {
LOG.setLevel(Level.ALL);
return LOG;
}
@SuppressWarnings("rawtypes")
@Override
public void prepare(Map conf, TridentOperationContext context) {
this.partitionId = context.getPartitionIndex();
}
@Override
public void cleanup() {
}
@Override
public Map<String, URLStat> init(Object batchId, TridentCollector
collector) {
return new HashMap<String, URLStat>();
}
@Override
public void aggregate(Map<String, URLStat> val, TridentTuple tuple,
TridentCollector collector) {
this.getLogger().debug("In localCombiner2 aggregate");
String urlHash = tuple.getString(0);
URLStat _tupleStat=(URLStat) tuple.getValue(1);
String url =_tupleStat.getUrl() ;
URLStat _obj = val.get(urlHash);
if (_obj == null) {
_obj = new URLStat();
_obj.setHash(urlHash);
_obj.setUrl(url);
}
_obj.mergeAvg(_tupleStat.getAvgTTL(),_tupleStat.getCount());
val.put(_obj.getHash(), _obj);
}
@Override
public void complete(Map<String, URLStat> val, TridentCollector
collector) {
Iterator it = val.entrySet().iterator();
this.getLogger().debug("In localCombiner2
complete!!!!!!!!!!!!!!!");
while (it.hasNext()) {
Map.Entry entry = (Map.Entry) it.next();
String key = (String) entry.getKey();
URLStat _val = (URLStat) entry.getValue();
collector.emit(new Values(key, _val));
this.getLogger().debug("In localCombiner2
emit!:"+key.toString());
}
}
}
public static StormTopology buildTopology(LocalDRPC drpc) {
RedisPubSubTransactionalSpout spout = new
RedisPubSubTransactionalSpout(Constants.RedisHost,
Constants.RedisPort,
Constants.Pattern,
Constants.OutputFieldName);
TridentTopology topology = new TridentTopology();
TridentState wordCounts = topology.newStream("spout1",
spout).shuffle()
.aggregate(new Fields(Constants.OutputFieldName),
new localCombiner(), new Fields("hash", "urlstat"))
.parallelismHint(16).groupBy(new Fields("hash"))
.aggregate(new Fields("hash","urlstat"), new
localCombiner2(), new Fields("hash1","urlstat1")).parallelismHint(16)
.partitionPersist(new SumDBFactory(), new Fields("hash1",
"urlstat1"), new SumUpdater()).parallelismHint(1);
return topology.build();
}
public static void main(String[] args) throws Exception {
Config conf = new Config();
conf.setMaxSpoutPending(20);
conf.setDebug(true);
conf.put(Config.NIMBUS_HOST, "10.5.140.132");
ArrayList<String> zookeeperSrvrs=new ArrayList<String>();
zookeeperSrvrs.add("10.5.140.131");
conf.put(Config.STORM_ZOOKEEPER_SERVERS, zookeeperSrvrs);
//Config.setMaxSpoutPending(conf,5000);
conf.put(Config.TOPOLOGY_DEBUG,true);
conf.put(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS,120);
if (args.length == 0) {
LocalDRPC drpc = new LocalDRPC();
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("wordCounter", conf,
buildTopology(drpc));
for (int i = 0; i < 100; i++) {
System.out.println("DRPC RESULT: " + drpc.execute("words",
"cat the dog jumped"));
Thread.sleep(1000);
}
} else {
conf.setNumWorkers(6);
StormSubmitter.submitTopology(args[0], conf,
buildTopology(null));
}
}
}
--
Maplefalling
--
Maplefalling
Fwd: Anyone can help check my application to do the url count statistic
Posted by MapleFalling <ma...@gmail.com>.
Hi,
I am trying to use storm trident to implement a function to calculate the
big size url data to do the url count and save into mongoDB, I use redis to
pump in data, and write up a redistranactionalspout to get data into storm.
Since I need 'exactly once' semantic, I use trident. But when I submit my
topology to storm, I found two workers fetch data from redis, one is for
$mastercoord-bg0<http://10.5.140.132:8080/topology/tridentWCount-46-1389489118/component/%24mastercoord-bg0>,
another is $spoutcoord-spout0<http://10.5.140.132:8080/topology/tridentWCount-46-1389489118/component/%24spoutcoord-spout0>
.
I do not quite understand why there are two spout to fetch data from redis.
It caused my calculation is not right.
I copy paste my code here, hope someone can check it out for me.
public class RedisPubSubTransactionalSpout implements ITridentSpout<String>{
static final long serialVersionUID = 737015318988609461L;
static Logger LOG = Logger.getLogger(RedisPubSubTransactionalSpout.class);
static final String REDIS_EMITTEDMSG="emittedMsgs";
SpoutOutputCollector _collector;
final String host;
final int port;
final String pattern;
LinkedBlockingQueue<String> queue;
JedisPool pool;
private final String singleOutputFieldName;
private Long count2storm=0L;
private Long countFromRedis=0L;
public RedisPubSubTransactionalSpout(String host, int port, String
pattern,String singleOutputFieldName) {
this.host = host;
this.port = port;
this.pattern = pattern;
this.singleOutputFieldName=singleOutputFieldName;
this.getLogger().debug("Set up RedisClient");
}
private Logger getLogger(){
LOG.setLevel(Level.ALL);
return LOG;
}
protected void initIfNeeded() {
if (pool==null) {
queue = new LinkedBlockingQueue<String>(1000);
pool = new JedisPool(new JedisPoolConfig(),host,port);
ListenerThread listener = new
ListenerThread(queue,pool,pattern);
listener.start();
getLogger().debug("Start redis reader process");
}
}
class ListenerThread extends Thread {
LinkedBlockingQueue<String> queue;
JedisPool pool;
String pattern;
public ListenerThread(LinkedBlockingQueue<String> queue, JedisPool pool,
String pattern) {
this.queue = queue;
this.pool = pool;
this.pattern = pattern;
}
public void run() {
JedisPubSub listener = new JedisPubSub() {
@Override
public void onMessage(String channel, String message) {
// getLogger().debug("redis spout get message from redis
"+message);
queue.offer(message);
countFromRedis++;
getLogger().debug("on message,total from
redis:"+countFromRedis.toString());
}
@Override
public void onPMessage(String pattern, String channel, String message) {
// getLogger().debug("redis spout get pmessage from
redis "+message);
queue.offer(message);
countFromRedis++;
getLogger().debug("on pmessage,total from
redis:"+countFromRedis.toString());
}
@Override
public void onPSubscribe(String channel, int subscribedChannels) {
// TODO Auto-generated method stub
}
@Override
public void onPUnsubscribe(String channel, int subscribedChannels) {
// TODO Auto-generated method stub
}
@Override
public void onSubscribe(String channel, int subscribedChannels) {
// TODO Auto-generated method stub
}
@Override
public void onUnsubscribe(String channel, int subscribedChannels) {
// TODO Auto-generated method stub
}
};
Jedis jedis = pool.getResource();
try {
//jedis.psubscribe(listener, pattern);
jedis.subscribe(listener,pattern);
} finally {
pool.returnResource(jedis);
}
}
};
private class Coordinator implements BatchCoordinator<String>{
private final long batchSize;
public Coordinator(long batchSize){
this.batchSize=batchSize;
initIfNeeded();
}
@Override
public String initializeTransaction(long txid, String prevMetaData){
String emittedId=UUID.randomUUID().toString();
Jedis jedisClient=null;
StringBuffer emittedMsg=new StringBuffer();
Long txId=txid;
try{
jedisClient=pool.getResource();
getLogger().debug("init transaction!");
Long _count=0L;
for (int idx=0;idx<batchSize;idx++){
String ret = queue.poll();
if(ret==null) {
Utils.sleep(50);
} else {
getLogger().debug("coordinator get message from
redis "+ret);
emittedMsg.append(ret).append("\n");
_count++;
}
}
String _emitMsgStr=emittedMsg.toString();
getLogger().debug("emit Msg in coordinator:"+_emitMsgStr);
if ((jedisClient!=null) && (_emitMsgStr!=null) &&
(!_emitMsgStr.isEmpty())){
getLogger().debug("Put emit message into redis with
txId:"+txId.toString());
jedisClient.hset(REDIS_EMITTEDMSG,txId.toString(),emittedMsg.toString());
}
//save into mongoDB
/* BasicDBObject _total = (BasicDBObject)
MongoDB.INSTANCE.getMongoDBCol2().findOne();
if (_total==null){
_total =new BasicDBObject("t",0);
}
Long count=(Long) _total.get("t");
if (count==0L || count== null){
count=1L;
}else{
count=count+_count;
}
_total.put("t",count);
MongoDB.INSTANCE.getMongoDBCol2().save(_total); */
count2storm=count2storm+_count;
getLogger().debug("total to storm:"+count2storm.toString());
}catch(Exception e){
getLogger().debug("error in init
transaction!"+e.getMessage());
}finally {
if (jedisClient!=null)
pool.returnResource(jedisClient);
return emittedId;
}
}
@Override
public void success(long txid){
/* Long txId=txid;
Jedis jedisClient=null;
try{
jedisClient=pool.getResource();
//remove from successful handled message
if (jedisClient!=null){
getLogger().debug("remove transaction
:"+txId.toString());
jedisClient.hdel(REDIS_EMITTEDMSG,txId.toString());
}
}catch(Exception e){
}finally {
if (jedisClient!=null)
pool.returnResource(jedisClient);
}*/
}
@Override
public boolean isReady(long txid){
if (pool!=null){
return true;
}else{
return false;
}
}
@Override
public void close(){
pool.destroy();
}
}
private class TheEmitter implements Emitter<String> {
@Override
public void emitBatch(TransactionAttempt tx, String
coordinatorMeta, TridentCollector collector) {
Jedis jedisClient=null;
initIfNeeded();
try{
jedisClient=pool.getResource();
String
msgData=jedisClient.hget(REDIS_EMITTEDMSG,tx.getTransactionId().toString());
getLogger().debug("emitter get message from redis
"+msgData);
if (msgData!=null){
String msgs[]=msgData.split("\n");
for (String msg:msgs){
getLogger().debug("get message from emittedMsgs
:"+msg);
collector.emit(new Values(msg));
}
}
}catch (Exception e){
getLogger().debug("Emit message error"+e.getMessage());
}finally {
if (null != jedisClient)
pool.returnResource(jedisClient);
}
}
@Override
public void success(TransactionAttempt tx) {
// NOP
Long txId=tx.getTransactionId();
initIfNeeded();
Jedis jedisClient=null;
try{
jedisClient=pool.getResource();
//remove from successful handled message
if (jedisClient!=null){
getLogger().debug("remove transaction
:"+txId.toString());
jedisClient.hdel(REDIS_EMITTEDMSG,txId.toString());
}
}catch(Exception e){
}finally {
if (jedisClient!=null)
pool.returnResource(jedisClient);
}
}
@Override
public void close() {
// NOP
}
}
@Override
public BatchCoordinator<String> getCoordinator(String txStateId, Map
conf, TopologyContext context) {
return new Coordinator((Long) 100L);
}
@Override
public Emitter<String> getEmitter(String txStateId, Map conf,
TopologyContext context) {
return new TheEmitter();
}
@Override
public Map getComponentConfiguration() {
return null;
}
@Override
public Fields getOutputFields() {
return new Fields(singleOutputFieldName);
}
}
public class TridentURLCount {
public static final class Constants {
public static final String RedisHost = "abc.abc.abc.abc";
public static final int RedisPort = 6379;
public static final String Pattern = "stormURL";
public static final String OutputFieldName = "url";
public static final String urlHash = "h";
public static final String url = "u";
public static final String urlCount = "c";
public static final String aveTTL = "at";
public static final String txId="tid";
}
public static class URLStat implements Serializable {
private String hash;
public Long getCount() {
return count;
}
public String getUrl() {
return url;
}
public void setUrl(String url) {
this.url = url;
}
public Long getAvgTTL() {
return this.avgTTL;
}
public void mergeAvg(Long avgTTL,Long count) {
this.avgTTL = ((this.avgTTL * this.count) + avgTTL*count) /
(this.count + count);
this.count=this.count+count;
}
private String url;
public String getHash() {
return hash;
}
public void setHash(String hash) {
this.hash = hash;
}
private Long avgTTL = 0L;
private Long count = 0L;
}
public static class SumDB implements State {
private Long txId;
private static final Logger LOG = Logger.getLogger(SumDB.class);
{
LOG.setLevel(Level.ALL);
}
private Logger getLogger() {
return LOG;
}
public void beginCommit(Long txid) {
this.txId = txid;
}
public void commit(Long txid) {
}
public void update(Map<String, URLStat> data) {
MongoDB.INSTANCE.setupMongoDB();
this.getLogger().debug("Start to save data to MongoDB, here we
need to make exact process once semantics");
Iterator it = data.entrySet().iterator();
while (it.hasNext()) {
Map.Entry entry = (Map.Entry) it.next();
String key = (String) entry.getKey();
URLStat val = (URLStat) entry.getValue();
BasicDBObject stormStat = new
BasicDBObject(Constants.urlHash, key);
BasicDBObject _dbStat = (BasicDBObject)
MongoDB.INSTANCE.getMongoDBCol().findOne(stormStat);
if (_dbStat == null) {
_dbStat = stormStat;
}
Long _txId=(Long) _dbStat.get(Constants.txId);
if (_txId==null || _txId <this.txId){
Long _count=(Long) _dbStat.get(Constants.urlCount);
if (_count==null)
_count=0L;
_count=_count+val.getCount();
_dbStat.put(Constants.url, val.getUrl());
_dbStat.put(Constants.urlCount, _count);
_dbStat.put(Constants.aveTTL, val.getAvgTTL());
_dbStat.put(Constants.txId, this.txId);
MongoDB.INSTANCE.getMongoDBCol().save(_dbStat);
this.getLogger().debug("Save stat into mongoDB for url
" + val.getUrl());
}
}
}
}
public static class SumDBFactory implements StateFactory {
public State makeState(Map a_conf, IMetricsContext a_context,
int a_partitionIndex, int a_numPartitions) {
return new SumDB();
}
}
public static class SumUpdater extends BaseStateUpdater<SumDB> {
public void updateState(SumDB state, List<TridentTuple> tuples,
TridentCollector collector) {
Map<String, URLStat> counts = new HashMap<String,
URLStat>();
for (TridentTuple tuple : tuples) {
URLStat _obj=(URLStat) tuple.getValue(1);
counts.put(_obj.getHash(), _obj);
}
state.update(counts);
}
}
public static class localCombiner implements Aggregator<Map<String,
URLStat>> {
int partitionId;
private static final Logger LOG =
Logger.getLogger(localCombiner.class);
private Logger getLogger() {
LOG.setLevel(Level.ALL);
return LOG;
}
@SuppressWarnings("rawtypes")
@Override
public void prepare(Map conf, TridentOperationContext context) {
this.partitionId = context.getPartitionIndex();
}
@Override
public void cleanup() {
}
@Override
public Map<String, URLStat> init(Object batchId,
TridentCollector collector) {
return new HashMap<String, URLStat>();
}
@Override
public void aggregate(Map<String, URLStat> val, TridentTuple
tuple, TridentCollector collector) {
String dataStr = (String) tuple.getString(0);
this.getLogger().debug("Get Data from source:" + dataStr);
if (dataStr != null && !dataStr.isEmpty()) {
String[] dataArr = dataStr.split(",");
String urlHash = dataArr[0];
String url = dataArr[1];
Long ttl = Long.parseLong(dataArr[5], 10);
URLStat _obj = val.get(urlHash);
if (_obj == null) {
_obj = new URLStat();
_obj.setHash(urlHash);
_obj.setUrl(url);
}
_obj.mergeAvg(ttl,1L);
val.put(_obj.getHash(), _obj);
}
}
@Override
public void complete(Map<String, URLStat> val, TridentCollector
collector) {
this.getLogger().debug("In localCombiner complete");
Iterator it = val.entrySet().iterator();
while (it.hasNext()) {
Map.Entry entry = (Map.Entry) it.next();
String key = (String) entry.getKey();
URLStat _val = (URLStat) entry.getValue();
this.getLogger().debug("emit??????????????:"+_val.getHash().toString());
collector.emit(new Values(key, _val));
}
}
}
public static class localCombiner2 implements Aggregator<Map<String,
URLStat>> {
int partitionId;
private static final Logger LOG =
Logger.getLogger(localCombiner2.class);
private Logger getLogger() {
LOG.setLevel(Level.ALL);
return LOG;
}
@SuppressWarnings("rawtypes")
@Override
public void prepare(Map conf, TridentOperationContext context) {
this.partitionId = context.getPartitionIndex();
}
@Override
public void cleanup() {
}
@Override
public Map<String, URLStat> init(Object batchId, TridentCollector
collector) {
return new HashMap<String, URLStat>();
}
@Override
public void aggregate(Map<String, URLStat> val, TridentTuple tuple,
TridentCollector collector) {
this.getLogger().debug("In localCombiner2 aggregate");
String urlHash = tuple.getString(0);
URLStat _tupleStat=(URLStat) tuple.getValue(1);
String url =_tupleStat.getUrl() ;
URLStat _obj = val.get(urlHash);
if (_obj == null) {
_obj = new URLStat();
_obj.setHash(urlHash);
_obj.setUrl(url);
}
_obj.mergeAvg(_tupleStat.getAvgTTL(),_tupleStat.getCount());
val.put(_obj.getHash(), _obj);
}
@Override
public void complete(Map<String, URLStat> val, TridentCollector
collector) {
Iterator it = val.entrySet().iterator();
this.getLogger().debug("In localCombiner2
complete!!!!!!!!!!!!!!!");
while (it.hasNext()) {
Map.Entry entry = (Map.Entry) it.next();
String key = (String) entry.getKey();
URLStat _val = (URLStat) entry.getValue();
collector.emit(new Values(key, _val));
this.getLogger().debug("In localCombiner2
emit!:"+key.toString());
}
}
}
public static StormTopology buildTopology(LocalDRPC drpc) {
RedisPubSubTransactionalSpout spout = new
RedisPubSubTransactionalSpout(Constants.RedisHost,
Constants.RedisPort,
Constants.Pattern,
Constants.OutputFieldName);
TridentTopology topology = new TridentTopology();
TridentState wordCounts = topology.newStream("spout1",
spout).shuffle()
.aggregate(new Fields(Constants.OutputFieldName),
new localCombiner(), new Fields("hash", "urlstat"))
.parallelismHint(16).groupBy(new Fields("hash"))
.aggregate(new Fields("hash","urlstat"), new
localCombiner2(), new Fields("hash1","urlstat1")).parallelismHint(16)
.partitionPersist(new SumDBFactory(), new Fields("hash1",
"urlstat1"), new SumUpdater()).parallelismHint(1);
return topology.build();
}
public static void main(String[] args) throws Exception {
Config conf = new Config();
conf.setMaxSpoutPending(20);
conf.setDebug(true);
conf.put(Config.NIMBUS_HOST, "10.5.140.132");
ArrayList<String> zookeeperSrvrs=new ArrayList<String>();
zookeeperSrvrs.add("10.5.140.131");
conf.put(Config.STORM_ZOOKEEPER_SERVERS, zookeeperSrvrs);
//Config.setMaxSpoutPending(conf,5000);
conf.put(Config.TOPOLOGY_DEBUG,true);
conf.put(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS,120);
if (args.length == 0) {
LocalDRPC drpc = new LocalDRPC();
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("wordCounter", conf,
buildTopology(drpc));
for (int i = 0; i < 100; i++) {
System.out.println("DRPC RESULT: " + drpc.execute("words",
"cat the dog jumped"));
Thread.sleep(1000);
}
} else {
conf.setNumWorkers(6);
StormSubmitter.submitTopology(args[0], conf,
buildTopology(null));
}
}
}
--
Maplefalling
--
Maplefalling