You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@storm.apache.org by david kavanagh <da...@hotmail.com> on 2016/03/29 18:33:43 UTC

Storm KafkaSpout Integration

Hi all,
I am currently trying use TestTopologyStaticHosts to try connect the KafkaSpout to a Kafka topic. I have a ZooKeeper and a Kafka instance running on my localhost. I have a topic named "twitter-topic" that has some tweets in it. This is all working as expected. I can run the consumer in the terminal and it returns the tweets. I want to use the KafkaSpout to connect to the Kafka topic and pull the tweets into a topology. I have been working on this a few days now and no success.
So far i have learned that when Storm is run in local mode that it uses an in memory zookeeper on port 2000, which would not allow it to connect to the Kafka topic. I have tried to get around this using the following syntax that i found online:
LocalCluster cluster = new LocalCluster("localhost", new Long(2181));
It is still not working but it seems to be connecting to Kafka as it gives a 'closed socket connection' message when i cancel the operation (after it does not work and hangs open). It also says in the storm output that it is connected to localhost 2181 so it seems to be getting that far. I have included the full output from Storm in a txt file attached.
Here is the code i am using in the TestTopologyStaticHosts class:
 public static void main(String[] args) throws Exception {
        //String zkConnString = "localhost:2181";
        GlobalPartitionInformation hostsAndPartitions = new GlobalPartitionInformation();        hostsAndPartitions.addPartition(0, new Broker("127.0.0.1", 9092));        BrokerHosts brokerHosts = new StaticHosts(hostsAndPartitions);        // BrokerHosts brokerHosts = new ZkHosts(zkConnString, "/brokers");
        SpoutConfig kafkaConfig = new SpoutConfig(brokerHosts, "twitter-topic","/twitter","twitter-topic-id");        kafkaConfig.scheme = new SchemeAsMultiScheme(new StringScheme());        //kafkaConfig.forceStartOffsetTime(-2);
        TopologyBuilder builder = new TopologyBuilder();        builder.setSpout("words", new KafkaSpout(kafkaConfig), 10);        builder.setBolt("print", new PrinterBolt()).shuffleGrouping("words");        LocalCluster cluster = new LocalCluster("localhost", new Long(2181));        Config config = new Config();        config.setDebug(true);        // config.put("storm.zookeeper.servers", "localhost");        // config.put("storm.zookeeper.port", "2181");         cluster.submitTopology("kafka-test", config, builder.createTopology());
        Thread.sleep(600000);
    }
Judging by the output it seems that there is a problem with connecting to the Kafka partitions.I have tried many different things to get it to work but no luck. I have also been looking at using the KafkaSpoutTestTopology class but it is expecting arguments including 'dockerIp' which i don't understand.
Should i be using Storm in localmode?Should i be using the TestTopologyStaticHosts class or would the KafkaSpoutTestTopology class be better?
Any help at all would be greatly appreciated because i am really stuck.
Kind RegardsDavid Kavanagh
 		 	   		  

Re: Storm KafkaSpout Integration

Posted by John Reilly <jr...@inconspicuous.org>.
Those logs suggest that it is working, but that it has started to consume
from the end of the topic.

'Read partition information from: /twitter/twitter-topic-id/partition_0 -->
null' indicates that there were no consumer offsets in zookeeper, which is
what you would expect the first time that it is run.

Right after that, you see
37915 [Thread-11-words] INFO storm.kafka.PartitionManager - No partition
information found, using configuration to determine offset
37915 [Thread-11-words] INFO storm.kafka.PartitionManager - Starting Kafka
127.0.0.1:0 from offset 185

It says that it could not find an offset stored in zookeeper and that it is
starting from offset 185, which I presume is the end of the topic.  This
means it will not consume what is already in the topic, only new messages.

I think what you want is to change it to start at the head of the topic
when there is no offset available (or maybe to force from start always
regardless of offset in zookeeper?)

It's not clear to me which version of the kafka spout you are using, but it
looks like to achieve this in 0.9.6 the fields you should be looking at are
as follows:

kafkaConfig.startOffsetTime = kafka.api.OffsetRequest.EarliestTime();
kafkaConfig.forceFromStart = true;


Cheers,
John

On Wed, Mar 30, 2016 at 2:30 AM david kavanagh <da...@hotmail.com>
wrote:

> Thanks for the reply!
>
> I added the line as you suggested but there is still no difference
> unfortunately.
> I am just guessing at this stage but judging by the output below it, it
> seems like it is something to do with the partitioning or the offset.
> The warnings start by staying that  there are more tasks than partitions.
> Task 1 is assigned the partition that is created in the code (highlighted
> in green), then the rest of the tasks are not assigned any partitions.
> Eventually is states 'Read partition information from:
> /twitter/twitter-topic-id/partition_0  --> null'
>
> So it seems like it is not reading data from Kafka at all. I really don't
> understand what is going on here.
> Any ideas?
>
>
> Kind Regards
>
> David
>
> --------------------------------------------------
>
> *Storm Output:*
>
> Thread-9-print] INFO  backtype.storm.daemon.executor - Prepared bolt
> print:(2)
> 32644 [Thread-11-words] INFO
>  org.apache.curator.framework.imps.CuratorFrameworkImpl - Starting
> 32685 [Thread-19-words] WARN  storm.kafka.KafkaUtils - there are more
> tasks than partitions (tasks: 10; partitions: 1), some tasks will be idle
> 32686 [Thread-19-words] WARN  storm.kafka.KafkaUtils - Task [5/10] no
> partitions assigned
> 32686 [Thread-17-words] WARN  storm.kafka.KafkaUtils - there are more
> tasks than partitions (tasks: 10; partitions: 1), some tasks will be idle
> 32686 [Thread-15-words] WARN  storm.kafka.KafkaUtils - there are more
> tasks than partitions (tasks: 10; partitions: 1), some tasks will be idle
> 32686 [Thread-17-words] WARN  storm.kafka.KafkaUtils - Task [4/10] no
> partitions assigned
> 32686 [Thread-15-words] WARN  storm.kafka.KafkaUtils - Task [3/10] no
> partitions assigned
> 32686 [Thread-11-words] WARN  storm.kafka.KafkaUtils - there are more
> tasks than partitions (tasks: 10; partitions: 1), some tasks will be idle
> 32686 [Thread-11-words] INFO  storm.kafka.KafkaUtils - Task [1/10]
> assigned [Partition{host=127.0.0.1:9092, partition=0}]
> 32687 [Thread-29-words] WARN  storm.kafka.KafkaUtils - there are more
> tasks than partitions (tasks: 10; partitions: 1), some tasks will be idle
> 32697 [Thread-19-words-EventThread] INFO
>  org.apache.curator.framework.state.ConnectionStateManager - State change:
> CONNECTED
> 32697 [Thread-25-words-EventThread] INFO
>  org.apache.curator.framework.state.ConnectionStateManager - State change:
> CONNECTED
> 32697 [Thread-29-words-EventThread] INFO
>  org.apache.curator.framework.state.ConnectionStateManager - State change:
> CONNECTED
> 32697 [Thread-13-words-EventThread] INFO
>  org.apache.curator.framework.state.ConnectionStateManager - State change:
> CONNECTED
> 32697 [Thread-27-words-EventThread] INFO
>  org.apache.curator.framework.state.ConnectionStateManager - State change:
> CONNECTED
> 32697 [Thread-15-words-EventThread] INFO
>  org.apache.curator.framework.state.ConnectionStateManager - State change:
> CONNECTED
> 32689 [Thread-19-words] INFO  backtype.storm.daemon.executor - Opened
> spout words:(7)
> 32689 [Thread-25-words] WARN  storm.kafka.KafkaUtils - there are more
> tasks than partitions (tasks: 10; partitions: 1), some tasks will be idle
> 32689 [Thread-15-words] INFO  backtype.storm.daemon.executor - Opened
> spout words:(5)
> 32738 [Thread-25-words] WARN  storm.kafka.KafkaUtils - Task [8/10] no
> partitions assigned
> 32738 [Thread-25-words] INFO  backtype.storm.daemon.executor - Opened
> spout words:(10)
> 32689 [Thread-17-words] INFO  backtype.storm.daemon.executor - Opened
> spout words:(6)
> 32688 [Thread-13-words] WARN  storm.kafka.KafkaUtils - there are more
> tasks than partitions (tasks: 10; partitions: 1), some tasks will be idle
> 32688 [Thread-21-words] WARN  storm.kafka.KafkaUtils - there are more
> tasks than partitions (tasks: 10; partitions: 1), some tasks will be idle
> 32739 [Thread-21-words] WARN  storm.kafka.KafkaUtils - Task [6/10] no
> partitions assigned
> 32739 [Thread-21-words] INFO  backtype.storm.daemon.executor - Opened
> spout words:(8)
> 32687 [Thread-27-words] WARN  storm.kafka.KafkaUtils - there are more
> tasks than partitions (tasks: 10; partitions: 1), some tasks will be idle
> 32740 [Thread-27-words] WARN  storm.kafka.KafkaUtils - Task [9/10] no
> partitions assigned
> 32740 [Thread-27-words] INFO  backtype.storm.daemon.executor - Opened
> spout words:(11)
> 32687 [Thread-23-words] WARN  storm.kafka.KafkaUtils - there are more
> tasks than partitions (tasks: 10; partitions: 1), some tasks will be idle
> 32740 [Thread-23-words] WARN  storm.kafka.KafkaUtils - Task [7/10] no
> partitions assigned
> 32736 [Thread-29-words] WARN  storm.kafka.KafkaUtils - Task [10/10] no
> partitions assigned
> 32742 [Thread-17-words] INFO  backtype.storm.daemon.executor - Activating
> spout words:(6)
> 32872 [Thread-29-words] INFO  backtype.storm.daemon.executor - Opened
> spout words:(12)
> 32742 [Thread-25-words] INFO  backtype.storm.daemon.executor - Activating
> spout words:(10)
> 32742 [Thread-21-words] INFO  backtype.storm.daemon.executor - Activating
> spout words:(8)
> 32742 [Thread-27-words] INFO  backtype.storm.daemon.executor - Activating
> spout words:(11)
> 32742 [Thread-19-words] INFO  backtype.storm.daemon.executor - Activating
> spout words:(7)
> 32741 [Thread-15-words] INFO  backtype.storm.daemon.executor - Activating
> spout words:(5)
> 32740 [Thread-13-words] WARN  storm.kafka.KafkaUtils - Task [2/10] no
> partitions assigned
> 32873 [Thread-29-words] INFO  backtype.storm.daemon.executor - Activating
> spout words:(12)
> 32872 [Thread-23-words] INFO  backtype.storm.daemon.executor - Opened
> spout words:(9)
> 32873 [Thread-13-words] INFO  backtype.storm.daemon.executor - Opened
> spout words:(4)
> 32873 [Thread-23-words] INFO  backtype.storm.daemon.executor - Activating
> spout words:(9)
> 32873 [Thread-13-words] INFO  backtype.storm.daemon.executor - Activating
> spout words:(4)
> 37756 [Thread-23-words-EventThread] INFO
>  org.apache.curator.framework.state.ConnectionStateManager - State change:
> CONNECTED
> 37757 [Thread-17-words-EventThread] INFO
>  org.apache.curator.framework.state.ConnectionStateManager - State change:
> CONNECTED
> 37757 [Thread-21-words-EventThread] INFO
>  org.apache.curator.framework.state.ConnectionStateManager - State change:
> CONNECTED
> 37757 [Thread-11-words-EventThread] INFO
>  org.apache.curator.framework.state.ConnectionStateManager - State change:
> CONNECTED
> 37773 [Thread-11-words] INFO  storm.kafka.PartitionManager - Read
> partition information from: /twitter/twitter-topic-id/partition_0  --> null
> 37915 [Thread-11-words] INFO  storm.kafka.PartitionManager - No partition
> information found, using configuration to determine offset
> 37915 [Thread-11-words] INFO  storm.kafka.PartitionManager - Starting
> Kafka 127.0.0.1:0 from offset 185
> 37916 [Thread-11-words] INFO  backtype.storm.daemon.executor - Opened
> spout words:(3)
> 37917 [Thread-11-words] INFO  backtype.storm.daemon.executor - Activating
> spout words:(3)
> 62005 [Thread-11-words] INFO  backtype.storm.daemon.executor - Processing
> received message source: __system:-1, stream: __tick, id: {}, [30]
> 62013 [Thread-13-words] INFO  backtype.storm.daemon.executor - Processing
> received message source: __system:-1, stream: __tick, id: {}, [30]
>
>
> ------------------------------
> Date: Wed, 30 Mar 2016 10:10:54 +0530
> Subject: Re: Storm KafkaSpout Integration
> From: dkiralam@aadhya-analytics.com
> To: user@storm.apache.org
>
>
> Hi david,
>
> I think everything is good but you are missing a statement
> config.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 1); add it after the line
> config.setDebug(true);
>
> *Best regards,*
> *K.Sai Dilip Reddy.*
>
> On Tue, Mar 29, 2016 at 10:03 PM, david kavanagh <da...@hotmail.com>
> wrote:
>
> Hi all,
>
> I am currently trying use TestTopologyStaticHosts to try connect the
> KafkaSpout to a Kafka topic. I have a ZooKeeper and a Kafka instance
> running on my localhost. I have a topic named "twitter-topic" that has some
> tweets in it. This is all working as expected. I can run the consumer in
> the terminal and it returns the tweets. I want to use the KafkaSpout to
> connect to the Kafka topic and pull the tweets into a topology. I have
> been working on this a few days now and no success.
>
> So far i have learned that when Storm is run in local mode that it uses an
> in memory zookeeper on port 2000, which would not allow it to connect to
> the Kafka topic. I have tried to get around this using the following syntax
> that i found online:
>
> LocalCluster cluster = new LocalCluster("localhost", new Long(2181));
>
> It is still not working but it seems to be connecting to Kafka as it gives
> a 'closed socket connection' message when i cancel the operation (after it
> does not work and hangs open). It also says in the storm output that it is
> connected to localhost 2181 so it seems to be getting that far. I have
> included the full output from Storm in a txt file attached.
>
> Here is the code i am using in the TestTopologyStaticHosts class:
>
>  public static void main(String[] args) throws Exception {
>
>         //String zkConnString = "localhost:2181";
>
>         GlobalPartitionInformation hostsAndPartitions = new
> GlobalPartitionInformation();
>         hostsAndPartitions.addPartition(0, new Broker("127.0.0.1", 9092));
>         BrokerHosts brokerHosts = new StaticHosts(hostsAndPartitions);
>         // BrokerHosts brokerHosts = new ZkHosts(zkConnString, "/brokers");
>
>         SpoutConfig kafkaConfig = new SpoutConfig(brokerHosts,
> "twitter-topic","/twitter","twitter-topic-id");
>         kafkaConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
>         //kafkaConfig.forceStartOffsetTime(-2);
>
>         TopologyBuilder builder = new TopologyBuilder();
>         builder.setSpout("words", new KafkaSpout(kafkaConfig), 10);
>         builder.setBolt("print", new
> PrinterBolt()).shuffleGrouping("words");
>         LocalCluster cluster = new LocalCluster("localhost", new
> Long(2181));
>         Config config = new Config();
>         config.setDebug(true);
>         // config.put("storm.zookeeper.servers", "localhost");
>         // config.put("storm.zookeeper.port", "2181");
>         cluster.submitTopology("kafka-test", config,
> builder.createTopology());
>
>         Thread.sleep(600000);
>
>     }
>
> Judging by the output it seems that there is a problem with connecting to
> the Kafka partitions.
> I have tried many different things to get it to work but no luck. I have
> also been looking at using the KafkaSpoutTestTopology class but it is
> expecting arguments including 'dockerIp' which i don't understand.
>
> Should i be using Storm in localmode?
> Should i be using the TestTopologyStaticHosts class or would the KafkaSpoutTestTopology
> class be better?
>
> Any help at all would be greatly appreciated because i am really stuck.
>
> Kind Regards
> David Kavanagh
>
>
>

RE: Storm KafkaSpout Integration

Posted by david kavanagh <da...@hotmail.com>.
Larry,
Could you please explain yourself? A single worded email telling me what to do is confusing and ignorant.Why are you telling me to unsubscribe? I was trying to be helpful by giving someone some code that they requested.  
RegardsDavid

Date: Fri, 1 Apr 2016 10:25:51 +0100
Subject: Re: Storm KafkaSpout Integration
From: larryakah@gmail.com
To: user@storm.apache.org

unsubscribe
2016-04-01 10:18 GMT+01:00 david kavanagh <da...@hotmail.com>:



Hey,
What i found online was a simple java Kafka Consumer, and i used that code to write my own KafkaSpout.Here is the link to the consumer that i found: http://wpcertification.blogspot.ie/2014/08/java-client-for-publishing-and.html
I am using my implementation for a college assignment. Its working well enough for me to do what i need, but i doubt very much that it would be production quality. Here is the code anyway, it might be useful for something.
RegardsDavid
-----------------------------------------------------------------------
package storm.kafka;
import backtype.storm.Config;import backtype.storm.metric.api.IMetric;import backtype.storm.spout.SpoutOutputCollector;import backtype.storm.task.TopologyContext;import backtype.storm.topology.OutputFieldsDeclarer;import backtype.storm.topology.base.BaseRichSpout;import kafka.message.Message;import backtype.storm.tuple.Fields;import backtype.storm.tuple.Values;import backtype.storm.utils.Utils;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import storm.kafka.PartitionManager.KafkaMessageId;
import java.util.*;
import java.io.UnsupportedEncodingException;import java.nio.ByteBuffer;import java.util.HashMap;import java.util.List;import java.util.Map;import java.util.Properties;
import kafka.consumer.Consumer;import kafka.consumer.ConsumerConfig;import kafka.consumer.ConsumerIterator;import kafka.consumer.KafkaStream;import kafka.javaapi.consumer.ConsumerConnector;import kafka.javaapi.message.ByteBufferMessageSet;import kafka.message.MessageAndOffset;
public class MyKafkaSpout extends BaseRichSpout {
    SpoutOutputCollector _collector;    KafkaStream<byte[], byte[]> _stream;
    public static class KafkaConnector extends Thread {
    	final static String clientId = "KafkaTweetConsumer";    	final static String TOPIC = "twitter-topic";    	ConsumerConnector consumerConnector;
    	public KafkaConnector(){    	    Properties properties = new Properties();    	    properties.put("zookeeper.connect","localhost:2181");    	    properties.put("group.id","test-group");    	    ConsumerConfig consumerConfig = new ConsumerConfig(properties);    	    consumerConnector = Consumer.createJavaConsumerConnector(consumerConfig);    	}
    	public KafkaStream<byte[], byte[]> getStream() {        	Map<String, Integer> topicCountMap = new HashMap<String, Integer>();        	topicCountMap.put(TOPIC, new Integer(1));        	Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumerConnector.createMessageStreams(topicCountMap);        	KafkaStream<byte[], byte[]> kafkaStream =  consumerMap.get(TOPIC).get(0);        	return kafkaStream;    	}    }


	@Override    public void open(Map conf, final TopologyContext context, final SpoutOutputCollector collector) {    	_collector = collector;
    	KafkaConnector kafkaConnector = new KafkaConnector();    	_stream = kafkaConnector.getStream();
    }
    @Override  	public void nextTuple() {    	ConsumerIterator<byte[], byte[]> it = _stream.iterator();    	String message = new String(it.next().message());    	_collector.emit(new Values(message));  	}
  	@Override  	public void ack(Object id) {  	}
  	@Override  	public void fail(Object id) {  	}
  	@Override    public void declareOutputFields(OutputFieldsDeclarer declarer) {       declarer.declare(new Fields("tweet"));    }}
 

Date: Thu, 31 Mar 2016 11:12:38 -0700
Subject: Re: Storm KafkaSpout Integration
From: tech.login.id2@gmail.com
To: user@storm.apache.org

Hey David,
I would be interested in seeing what Kafka-Spouts you found online and why you found them better.Also, if you have your own Kafka-Spout opensourced in github, a link to that would be great too.
Thanks Tid
On Thu, Mar 31, 2016 at 2:21 AM, david kavanagh <da...@hotmail.com> wrote:



Hi Spico,
I changed the parallelism as you suggested but it didn't work. Yesterday evening i gave up on using the KafkaSpout class that comes with storm. I found some Kafka consumer java classes online and wrote my own Kafka spout which is working fine. Thanks for the advice anyway. 
RegardsDavid 
Date: Wed, 30 Mar 2016 21:33:38 +0300
Subject: Re: Storm KafkaSpout Integration
From: spicoflorin@gmail.com
To: user@storm.apache.org

hi,
 i think the problem that you have is that you have stup one partition per topic, but you try to conume with 10 kafka task spouts. 
check this lines builder.setSpout("words", new KafkaSpout(kafkaConfig), 10);
10 represents the task parslellism for the spout, that shoul be in the case of kafka the same number as the partition you have  setup for kafka topic. you use more than one kafka partition when you would like to consume in parallel the data from the topic. please check the very good documentation on ksfka partition on confluent site.
 in my opinon, set up your hint parallelism to 1 would solve the problem. tne max spout pending has a different meaning.
regards,
florin

On Wednesday, March 30, 2016, david kavanagh <da...@hotmail.com> wrote:
> I am only creating one partition in code here:
>  GlobalPartitionInformation hostsAndPartitions = new GlobalPartitionInformation();
>  hostsAndPartitions.addPartition(0, new Broker("127.0.0.1", 9092));
>  BrokerHosts brokerHosts = new StaticHosts(hostsAndPartitions);
> I hope that answered your question. I am new to both Storm and Kafka so i am not sure exactly how it works. 
> If i am understanding you correctly, the line you told me to add in the first email should work because i am only creating one partition?
> config.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 1);
> Thanks again for the help :-)
> David
> ________________________________
> Date: Wed, 30 Mar 2016 15:36:19 +0530
> Subject: Re: Storm KafkaSpout Integration
> From: dkiralam@aadhya-analytics.com
> To: user@storm.apache.org
>
>
> Hi david,
>
> Can I know how many partitions you are having?
> statement I have given to you is default.if you are  running with no of  partitions make sure you give same number eg: if you are running with two partitions change the number to 2 in the statement .
> config.put(Config.TOPOLOGY_MAX_SPOUT_PENDING,2 );
>  
> Best regards,
> K.Sai Dilip Reddy.
> On Wed, Mar 30, 2016 at 3:00 PM, david kavanagh <da...@hotmail.com> wrote:
>
> Thanks for the reply!
> I added the line as you suggested but there is still no difference unfortunately. 
> I am just guessing at this stage but judging by the output below it, it seems like it is something to do with the partitioning or the offset.
> The warnings start by staying that  there are more tasks than partitions. 
> Task 1 is assigned the partition that is created in the code (highlighted in green), then the rest of the tasks are not assigned any partitions.
> Eventually is states 'Read partition information from: /twitter/twitter-topic-id/partition_0  --> null'
> So it seems like it is not reading data from Kafka at all. I really don't understand what is going on here.
> Any ideas?
>
> Kind Regards
> David
> --------------------------------------------------
> Storm Output:
> Thread-9-print] INFO  backtype.storm.daemon.executor - Prepared bolt print:(2)
> 32644 [Thread-11-words] INFO  org.apache.curator.framework.imps.CuratorFrameworkImpl - Starting
> 32685 [Thread-19-words] WARN  storm.kafka.KafkaUtils - there are more tasks than partitions (tasks: 10; partitions: 1), some tasks will be idle
> 32686 [Thread-19-words] WARN  storm.kafka.KafkaUtils - Task [5/10] no partitions assigned
> 32686 [Thread-17-words] WARN  storm.kafka.KafkaUtils - there are more tasks than partitions (tasks: 10; partitions: 1), some tasks will be idle
> 32686 [Thread-15-words] WARN  storm.kafka.KafkaUtils - there are more tasks than partitions (tasks: 10; partitions: 1), some tasks will be idle
> 32686 [Thread-17-words] WARN  storm.kafka.KafkaUtils - Task [4/10] no partitions assigned
> 32686 [Thread-15-words] WARN  storm.kafka.KafkaUtils - Task [3/10] no partitions assigned
> 32686 [Thread-11-words] WARN  storm.kafka.KafkaUtils - there are more tasks than partitions (tasks: 10; partitions: 1), some tasks will be idle
> 32686 [Thread-11-words] INFO  storm.kafka.KafkaUtils - Task [1/10] assigned [Partition{host=127.0.0.1:9092, partition=0}]
> 32687 [Thread-29-words] WARN  storm.kafka.KafkaUtils - there are more tasks than partitions (tasks: 10; partitions: 1), some tasks will be idle
> 32697 [Thread-19-words-EventThread] INFO  org.apache.curator.framework.state.ConnectionStateManager - State change: CONNECTED
> 32697 [Thread-25-words-EventThread] INFO  org.apache.curator.framework.state.ConnectionStateManager - State change: CONNECTED
> 32697 [Thread-29-words-EventThread] INFO  org.apache.curator.framework.state.ConnectionStateManager - State change: CONNECTED
> 32697 [Thread-13-words-EventThread] INFO  org.apache.curator.framework.state.ConnectionStateManager - State change: CONNECTED
> 32697 [Thread-27-words-EventThread] INFO  org.apache.curator.framework.state.ConnectionStateManager - State change: CONNECTED
> 32697 [Thread-15-words-EventThread] INFO  org.apache.curator.framework.state.ConnectionStateManager - State change: CONNECTED
> 32689 [Thread-19-words] INFO  backtype.storm.daemon.executor - Opened spout words:(7)
> 32689 [Thread-25-words] WARN  storm.kafka.KafkaUtils - there are more tasks than partitions (tasks: 10; partitions: 1), some tasks will be idle
> 32689 [Thread-15-words] INFO  backtype.storm.daemon.executor - Opened spout words:(5)
> 32738 [Thread-25-words] WARN  storm.kafka.KafkaUtils - Task [8/10] no partitions assigned
> 32738 [Thread-25-words] INFO  backtype.storm.daemon.executor - Opened spout words:(10)
> 32689 [Thread-17-words] INFO  backtype.storm.daemon.executor - Opened spout words:(6)
> 32688 [Thread-13-words] WARN  storm.kafka.KafkaUtils - there are more tasks than partitions (tasks: 10; partitions: 1), some tasks will be idle
> 32688 [Thread-21-words] WARN  storm.kafka.KafkaUtils - there are more tasks than partitions (tasks: 10; partitions: 1), some tasks will be idle
> 32739 [Thread-21-words] WARN  storm.kafka.KafkaUtils - Task [6/10] no partitions assigned
> 32739 [Thread-21-words] INFO  backtype.storm.daemon.executor - Opened spout words:(8)
> 32687 [Thread-27-words] WARN  storm.kafka.KafkaUtils - there are more tasks than partitions (tasks: 10; partitions: 1), some tasks will be idle
> 32740 [Thread-27-words] WARN  storm.kafka.KafkaUtils - Task [9/10] no partitions assigned
> 32740 [Thread-27-words] INFO  backtype.storm.daemon.executor - Opened spout words:(11)
> 32687 [Thread-23-words] WARN  storm.kafka.KafkaUtils - there are more tasks than partitions (tasks: 10; partitions: 1), some tasks will be idle
> 32740 [Thread-23-words] WARN  storm.kafka.KafkaUtils - Task [7/10] no partitions assigned
> 32736 [Thread-29-words] WARN  storm.kafka.KafkaUtils - Task [10/10] no partitions assigned
> 32742 [Thread-17-words] INFO  backtype.storm.daemon.executor - Activating spout words:(6)
> 32872 [Thread-29-words] INFO  backtype.storm.daemon.executor - Opened spout words:(12)
> 32742 [Thread-25-words] INFO  backtype.storm.daemon.executor - Activating spout words:(10)
> 32742 [Thread-21-words] INFO  backtype.storm.daemon.executor - Activating spout words:(8)
> 32742 [Thread-27-words] INFO  backtype.storm.daemon.executor - Activating spout words:(11)
> 32742 [Thread-19-words] INFO  backtype.storm.daemon.executor - Activating spout words:(7)
> 32741 [Thread-15-words] INFO  backtype.storm.daemon.executor - Activating spout words:(5)
> 32740 [Thread-13-words] WARN  storm.kafka.KafkaUtils - Task [2/10] no partitions assigned
> 32873 [Thread-29-words] INFO  backtype.storm.daemon.executor - Activating spout words:(12)
> 32872 [Thread-23-words] INFO  backtype.storm.daemon.executor - Opened spout words:(9)
> 32873 [Thread-13-words] INFO  backtype.storm.daemon.executor - Opened spout words:(4)
> 32873 [Thread-23-words] INFO  backtype.storm.daemon.executor - Activating spout words:(9)
> 32873 [Thread-13-words] INFO  backtype.storm.daemon.executor - Activating spout words:(4)
> 37756 [Thread-23-words-EventThread] INFO  org.apache.curator.framework.state.ConnectionStateManager - State change: CONNECTED
> 37757 [Thread-17-words-EventThread] INFO  org.apache.curator.framework.state.ConnectionStateManager - State change: CONNECTED
> 37757 [Thread-21-words-EventThread] INFO  org.apache.curator.framework.state.ConnectionStateManager - State change: CONNECTED
> 37757 [Thread-11-words-EventThread] INFO  org.apache.curator.framework.state.ConnectionStateManager - State change: CONNECTED
> 37773 [Thread-11-words] INFO  storm.kafka.PartitionManager - Read partition information from: /twitter/twitter-topic-id/partition_0  --> null
> 37915 [Thread-11-words] INFO  storm.kafka.PartitionManager - No partition information found, using configuration to determine offset
> 37915 [Thread-11-words] INFO  storm.kafka.PartitionManager - Starting Kafka 127.0.0.1:0 from offset 185
> 37916 [Thread-11-words] INFO  backtype.storm.daemon.executor - Opened spout words:(3)
> 37917 [Thread-11-words] INFO  backtype.storm.daemon.executor - Activating spout words:(3)
> 62005 [Thread-11-words] INFO  backtype.storm.daemon.executor - Processing received message source: __system:-1, stream: __tick, id: {}, [30]
> 62013 [Thread-13-words] INFO  backtype.storm.daemon.executor - Processing received message source: __system:-1, stream: __tick, id: {}, [30]
>
> ________________________________
> Date: Wed, 30 Mar 2016 10:10:54 +0530
> Subject: Re: Storm KafkaSpout Integration
> From: dkiralam@aadhya-analytics.com
> To: user@storm.apache.org
>
> Hi david,
>
> I think everything is good but you are missing a statement
> config.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 1); add it after the line  config.setDebug(true);
>  
> Best regards,
> K.Sai Dilip Reddy.
> On Tue, Mar 29, 2016 at 10:03 PM, david kavanagh <da...@hotmail.com> wrote:
>
> Hi all,
> I am currently trying use TestTopologyStaticHosts to try connect the KafkaSpout to a Kafka topic. I have a ZooKeeper and a Kafka instance running on my localhost. I have a topic named "twitter-topic" that has some tweets in it. This is all working as expected. I can run the consumer in the terminal and it returns the tweets. I want to use the KafkaSpout to connect to the Kafka topic and pull the tweets into a topology. I have been working on this a few days now and no success.
> So far i have learned that when Storm is run in local mode that it uses an in memory zookeeper on port 2000, which would not allow it to connect to the Kafka topic. I have tried to get around this using the following syntax that i found online:
> LocalCluster cluster = new LocalCluster("localhost", new Long(2181));
> It is still not working but it seems to be connecting to Kafka as it gives a 'closed socket connection' message when i cancel the operation (after it does not work and hangs open). It also says in the storm output that it is connected to localhost 2181 so it seems to be getting that far. I have included the full output from Storm in a txt file attached.
> Here is the code i am using in the TestTopologyStaticHosts class:
>  public static void main(String[] args) throws Exception {
>         //String zkConnString = "localhost:2181";
>         GlobalPartitionInformation hostsAndPartitions = new GlobalPartitionInformation();
>         hostsAndPartitions.addPartition(0, new Broker("127.0.0.1", 9092));
>         BrokerHosts brokerHosts = new StaticHosts(hostsAndPartitions);
>         // BrokerHosts brokerHosts = new ZkHosts(zkConnString, "/brokers");
>         SpoutConfig kafkaConfig = new SpoutConfig(brokerHosts, "twitter-topic","/twitter","twitter-topic-id");
>         kafkaConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
>         //kafkaConfig.forceStartOffsetTime(-2);
>         TopologyBuilder builder = new TopologyBuilder();
>         builder.setSpout("words", new KafkaSpout(kafkaConfig), 10);
>         builder.setBolt("print", new PrinterBolt()).shuffleGrouping("words");
>         LocalCluster cluster = new LocalCluster("localhost", new Long(2181));
>         Config config = new Config();
>         config.setDebug(true);
>         // config.put("storm.zookeeper.servers", "localhost");
>         // config.put("storm.zookeeper.port", "2181"); 
>         cluster.submitTopology("kafka-test", config, builder.createTopology());
>         Thread.sleep(600000);
>     }
> Judging by the output it seems that there is a problem with connecting to the Kafka partitions.
> I have tried many different things to get it to work but no luck. I have also been looking at using the KafkaSpoutTestTopology class but it is expecting arguments including 'dockerIp' which i don't understand.
> Should i be using Storm in localmode?
> Should i be using the TestTopologyStaticHosts class or would the KafkaSpoutTestTopology class be better?
> Any help at all would be greatly appreciated because i am really stuck.
> Kind Regards
> David Kavanagh
>
>
> 		 	   		  

 		 	   		  


-- 
*Akah Larry N.H*

*Android Platform Engineer*
*Founder IceTeck*
*www.iceteck.com*

Developing technologies for emergence and sustainable development.


 		 	   		  

Re: Storm KafkaSpout Integration

Posted by Larry Akah <la...@gmail.com>.
unsubscribe

2016-04-01 10:18 GMT+01:00 david kavanagh <da...@hotmail.com>:

> Hey,
>
> What i found online was a simple java Kafka Consumer, and i used that code
> to write my own KafkaSpout.
> Here is the link to the consumer that i found:
> http://wpcertification.blogspot.ie/2014/08/java-client-for-publishing-and.html
>
> I am using my implementation for a college assignment. Its working well
> enough for me to do what i need, but i doubt very much that it would be
> production quality. Here is the code anyway, it might be useful for
> something.
>
> Regards
> David
>
> -----------------------------------------------------------------------
>
> package storm.kafka;
>
> import backtype.storm.Config;
> import backtype.storm.metric.api.IMetric;
> import backtype.storm.spout.SpoutOutputCollector;
> import backtype.storm.task.TopologyContext;
> import backtype.storm.topology.OutputFieldsDeclarer;
> import backtype.storm.topology.base.BaseRichSpout;
> import kafka.message.Message;
> import backtype.storm.tuple.Fields;
> import backtype.storm.tuple.Values;
> import backtype.storm.utils.Utils;
> import org.slf4j.Logger;
> import org.slf4j.LoggerFactory;
> import storm.kafka.PartitionManager.KafkaMessageId;
>
> import java.util.*;
>
> import java.io.UnsupportedEncodingException;
> import java.nio.ByteBuffer;
> import java.util.HashMap;
> import java.util.List;
> import java.util.Map;
> import java.util.Properties;
>
> import kafka.consumer.Consumer;
> import kafka.consumer.ConsumerConfig;
> import kafka.consumer.ConsumerIterator;
> import kafka.consumer.KafkaStream;
> import kafka.javaapi.consumer.ConsumerConnector;
> import kafka.javaapi.message.ByteBufferMessageSet;
> import kafka.message.MessageAndOffset;
>
> public class MyKafkaSpout extends BaseRichSpout {
>
>     SpoutOutputCollector _collector;
>     KafkaStream<byte[], byte[]> _stream;
>
>     public static class KafkaConnector extends Thread {
>
>     final static String clientId = "KafkaTweetConsumer";
>     final static String TOPIC = "twitter-topic";
>     ConsumerConnector consumerConnector;
>
>     public KafkaConnector(){
>        Properties properties = new Properties();
>        properties.put("zookeeper.connect","localhost:2181");
>        properties.put("group.id","test-group");
>        ConsumerConfig consumerConfig = new ConsumerConfig(properties);
>        consumerConnector =
> Consumer.createJavaConsumerConnector(consumerConfig);
>     }
>
>     public KafkaStream<byte[], byte[]> getStream() {
>         Map<String, Integer> topicCountMap = new HashMap<String,
> Integer>();
>         topicCountMap.put(TOPIC, new Integer(1));
>         Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap =
> consumerConnector.createMessageStreams(topicCountMap);
>         KafkaStream<byte[], byte[]> kafkaStream =
>  consumerMap.get(TOPIC).get(0);
>         return kafkaStream;
>     }
>     }
>
>
>
> @Override
>     public void open(Map conf, final TopologyContext context, final
> SpoutOutputCollector collector) {
>     _collector = collector;
>
>     KafkaConnector kafkaConnector = new KafkaConnector();
>     _stream = kafkaConnector.getStream();
>
>     }
>
>     @Override
>   public void nextTuple() {
>     ConsumerIterator<byte[], byte[]> it = _stream.iterator();
>     String message = new String(it.next().message());
>     _collector.emit(new Values(message));
>   }
>
>   @Override
>   public void ack(Object id) {
>   }
>
>   @Override
>   public void fail(Object id) {
>   }
>
>   @Override
>     public void declareOutputFields(OutputFieldsDeclarer declarer) {
>        declarer.declare(new Fields("tweet"));
>     }
> }
>
>
>
> ------------------------------
> Date: Thu, 31 Mar 2016 11:12:38 -0700
> Subject: Re: Storm KafkaSpout Integration
> From: tech.login.id2@gmail.com
> To: user@storm.apache.org
>
> Hey David,
>
> I would be interested in seeing what Kafka-Spouts you found online and why
> you found them better.
> Also, if you have your own Kafka-Spout opensourced in github, a link to
> that would be great too.
>
> Thanks
> Tid
>
> On Thu, Mar 31, 2016 at 2:21 AM, david kavanagh <da...@hotmail.com>
> wrote:
>
> Hi Spico,
>
> I changed the parallelism as you suggested but it didn't work. Yesterday
> evening i gave up on using the KafkaSpout class that comes with storm. I
> found some Kafka consumer java classes online and wrote my own Kafka spout
> which is working fine. Thanks for the advice anyway.
>
> Regards
> David
>
> ------------------------------
> Date: Wed, 30 Mar 2016 21:33:38 +0300
> Subject: Re: Storm KafkaSpout Integration
> From: spicoflorin@gmail.com
> To: user@storm.apache.org
>
> hi,
> i think the problem that you have is that you have stup one partition per
> topic, but you try to conume with 10 kafka task spouts.
> check this lines builder.setSpout("words", new KafkaSpout(kafkaConfig),
> 10);
> 10 represents the task parslellism for the spout, that shoul be in the
> case of kafka the same number as the partition you have  setup for kafka
> topic. you use more than one kafka partition when you would like to consume
> in parallel the data from the topic. please check the very good
> documentation on ksfka partition on confluent site.
> in my opinon, set up your hint parallelism to 1 would solve the problem.
> tne max spout pending has a different meaning.
> regards,
> florin
>
> On Wednesday, March 30, 2016, david kavanagh <da...@hotmail.com>
> wrote:
> > I am only creating one partition in code here:
> >  GlobalPartitionInformation hostsAndPartitions = new
> GlobalPartitionInformation();
> >  hostsAndPartitions.addPartition(0, new Broker("127.0.0.1", 9092));
> >  BrokerHosts brokerHosts = new StaticHosts(hostsAndPartitions);
> > I hope that answered your question. I am new to both Storm and Kafka so
> i am not sure exactly how it works.
> > If i am understanding you correctly, the line you told me to add in the
> first email should work because i am only creating one partition?
> > config.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 1);
> > Thanks again for the help :-)
> > David
> > ________________________________
> > Date: Wed, 30 Mar 2016 15:36:19 +0530
> > Subject: Re: Storm KafkaSpout Integration
> > From: dkiralam@aadhya-analytics.com
> > To: user@storm.apache.org
> >
> >
> > Hi david,
> >
> > Can I know how many partitions you are having?
> > statement I have given to you is default.if you are  running with no of
> partitions make sure you give same number eg: if you are running with two
> partitions change the number to 2 in the statement .
> > config.put(Config.TOPOLOGY_MAX_SPOUT_PENDING,2 );
> >
> > Best regards,
> > K.Sai Dilip Reddy.
> > On Wed, Mar 30, 2016 at 3:00 PM, david kavanagh <da...@hotmail.com>
> wrote:
> >
> > Thanks for the reply!
> > I added the line as you suggested but there is still no difference
> unfortunately.
> > I am just guessing at this stage but judging by the output below it, it
> seems like it is something to do with the partitioning or the offset.
> > The warnings start by staying that  there are more tasks than
> partitions.
> > Task 1 is assigned the partition that is created in the code
> (highlighted in green), then the rest of the tasks are not assigned any
> partitions.
> > Eventually is states 'Read partition information from:
> /twitter/twitter-topic-id/partition_0  --> null'
> > So it seems like it is not reading data from Kafka at all. I really
> don't understand what is going on here.
> > Any ideas?
> >
> > Kind Regards
> > David
> > --------------------------------------------------
> > Storm Output:
> > Thread-9-print] INFO  backtype.storm.daemon.executor - Prepared bolt
> print:(2)
> > 32644 [Thread-11-words] INFO
>  org.apache.curator.framework.imps.CuratorFrameworkImpl - Starting
> > 32685 [Thread-19-words] WARN  storm.kafka.KafkaUtils - there are more
> tasks than partitions (tasks: 10; partitions: 1), some tasks will be idle
> > 32686 [Thread-19-words] WARN  storm.kafka.KafkaUtils - Task [5/10] no
> partitions assigned
> > 32686 [Thread-17-words] WARN  storm.kafka.KafkaUtils - there are more
> tasks than partitions (tasks: 10; partitions: 1), some tasks will be idle
> > 32686 [Thread-15-words] WARN  storm.kafka.KafkaUtils - there are more
> tasks than partitions (tasks: 10; partitions: 1), some tasks will be idle
> > 32686 [Thread-17-words] WARN  storm.kafka.KafkaUtils - Task [4/10] no
> partitions assigned
> > 32686 [Thread-15-words] WARN  storm.kafka.KafkaUtils - Task [3/10] no
> partitions assigned
> > 32686 [Thread-11-words] WARN  storm.kafka.KafkaUtils - there are more
> tasks than partitions (tasks: 10; partitions: 1), some tasks will be idle
> > 32686 [Thread-11-words] INFO  storm.kafka.KafkaUtils - Task [1/10]
> assigned [Partition{host=127.0.0.1:9092, partition=0}]
> > 32687 [Thread-29-words] WARN  storm.kafka.KafkaUtils - there are more
> tasks than partitions (tasks: 10; partitions: 1), some tasks will be idle
> > 32697 [Thread-19-words-EventThread] INFO
>  org.apache.curator.framework.state.ConnectionStateManager - State change:
> CONNECTED
> > 32697 [Thread-25-words-EventThread] INFO
>  org.apache.curator.framework.state.ConnectionStateManager - State change:
> CONNECTED
> > 32697 [Thread-29-words-EventThread] INFO
>  org.apache.curator.framework.state.ConnectionStateManager - State change:
> CONNECTED
> > 32697 [Thread-13-words-EventThread] INFO
>  org.apache.curator.framework.state.ConnectionStateManager - State change:
> CONNECTED
> > 32697 [Thread-27-words-EventThread] INFO
>  org.apache.curator.framework.state.ConnectionStateManager - State change:
> CONNECTED
> > 32697 [Thread-15-words-EventThread] INFO
>  org.apache.curator.framework.state.ConnectionStateManager - State change:
> CONNECTED
> > 32689 [Thread-19-words] INFO  backtype.storm.daemon.executor - Opened
> spout words:(7)
> > 32689 [Thread-25-words] WARN  storm.kafka.KafkaUtils - there are more
> tasks than partitions (tasks: 10; partitions: 1), some tasks will be idle
> > 32689 [Thread-15-words] INFO  backtype.storm.daemon.executor - Opened
> spout words:(5)
> > 32738 [Thread-25-words] WARN  storm.kafka.KafkaUtils - Task [8/10] no
> partitions assigned
> > 32738 [Thread-25-words] INFO  backtype.storm.daemon.executor - Opened
> spout words:(10)
> > 32689 [Thread-17-words] INFO  backtype.storm.daemon.executor - Opened
> spout words:(6)
> > 32688 [Thread-13-words] WARN  storm.kafka.KafkaUtils - there are more
> tasks than partitions (tasks: 10; partitions: 1), some tasks will be idle
> > 32688 [Thread-21-words] WARN  storm.kafka.KafkaUtils - there are more
> tasks than partitions (tasks: 10; partitions: 1), some tasks will be idle
> > 32739 [Thread-21-words] WARN  storm.kafka.KafkaUtils - Task [6/10] no
> partitions assigned
> > 32739 [Thread-21-words] INFO  backtype.storm.daemon.executor - Opened
> spout words:(8)
> > 32687 [Thread-27-words] WARN  storm.kafka.KafkaUtils - there are more
> tasks than partitions (tasks: 10; partitions: 1), some tasks will be idle
> > 32740 [Thread-27-words] WARN  storm.kafka.KafkaUtils - Task [9/10] no
> partitions assigned
> > 32740 [Thread-27-words] INFO  backtype.storm.daemon.executor - Opened
> spout words:(11)
> > 32687 [Thread-23-words] WARN  storm.kafka.KafkaUtils - there are more
> tasks than partitions (tasks: 10; partitions: 1), some tasks will be idle
> > 32740 [Thread-23-words] WARN  storm.kafka.KafkaUtils - Task [7/10] no
> partitions assigned
> > 32736 [Thread-29-words] WARN  storm.kafka.KafkaUtils - Task [10/10] no
> partitions assigned
> > 32742 [Thread-17-words] INFO  backtype.storm.daemon.executor -
> Activating spout words:(6)
> > 32872 [Thread-29-words] INFO  backtype.storm.daemon.executor - Opened
> spout words:(12)
> > 32742 [Thread-25-words] INFO  backtype.storm.daemon.executor -
> Activating spout words:(10)
> > 32742 [Thread-21-words] INFO  backtype.storm.daemon.executor -
> Activating spout words:(8)
> > 32742 [Thread-27-words] INFO  backtype.storm.daemon.executor -
> Activating spout words:(11)
> > 32742 [Thread-19-words] INFO  backtype.storm.daemon.executor -
> Activating spout words:(7)
> > 32741 [Thread-15-words] INFO  backtype.storm.daemon.executor -
> Activating spout words:(5)
> > 32740 [Thread-13-words] WARN  storm.kafka.KafkaUtils - Task [2/10] no
> partitions assigned
> > 32873 [Thread-29-words] INFO  backtype.storm.daemon.executor -
> Activating spout words:(12)
> > 32872 [Thread-23-words] INFO  backtype.storm.daemon.executor - Opened
> spout words:(9)
> > 32873 [Thread-13-words] INFO  backtype.storm.daemon.executor - Opened
> spout words:(4)
> > 32873 [Thread-23-words] INFO  backtype.storm.daemon.executor -
> Activating spout words:(9)
> > 32873 [Thread-13-words] INFO  backtype.storm.daemon.executor -
> Activating spout words:(4)
> > 37756 [Thread-23-words-EventThread] INFO
>  org.apache.curator.framework.state.ConnectionStateManager - State change:
> CONNECTED
> > 37757 [Thread-17-words-EventThread] INFO
>  org.apache.curator.framework.state.ConnectionStateManager - State change:
> CONNECTED
> > 37757 [Thread-21-words-EventThread] INFO
>  org.apache.curator.framework.state.ConnectionStateManager - State change:
> CONNECTED
> > 37757 [Thread-11-words-EventThread] INFO
>  org.apache.curator.framework.state.ConnectionStateManager - State change:
> CONNECTED
> > 37773 [Thread-11-words] INFO  storm.kafka.PartitionManager - Read
> partition information from: /twitter/twitter-topic-id/partition_0  --> null
> > 37915 [Thread-11-words] INFO  storm.kafka.PartitionManager - No
> partition information found, using configuration to determine offset
> > 37915 [Thread-11-words] INFO  storm.kafka.PartitionManager - Starting
> Kafka 127.0.0.1:0 from offset 185
> > 37916 [Thread-11-words] INFO  backtype.storm.daemon.executor - Opened
> spout words:(3)
> > 37917 [Thread-11-words] INFO  backtype.storm.daemon.executor -
> Activating spout words:(3)
> > 62005 [Thread-11-words] INFO  backtype.storm.daemon.executor -
> Processing received message source: __system:-1, stream: __tick, id: {},
> [30]
> > 62013 [Thread-13-words] INFO  backtype.storm.daemon.executor -
> Processing received message source: __system:-1, stream: __tick, id: {},
> [30]
> >
> > ________________________________
> > Date: Wed, 30 Mar 2016 10:10:54 +0530
> > Subject: Re: Storm KafkaSpout Integration
> > From: dkiralam@aadhya-analytics.com
> > To: user@storm.apache.org
> >
> > Hi david,
> >
> > I think everything is good but you are missing a statement
> > config.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 1); add it after the line
> config.setDebug(true);
> >
> > Best regards,
> > K.Sai Dilip Reddy.
> > On Tue, Mar 29, 2016 at 10:03 PM, david kavanagh <da...@hotmail.com>
> wrote:
> >
> > Hi all,
> > I am currently trying use TestTopologyStaticHosts to try connect the
> KafkaSpout to a Kafka topic. I have a ZooKeeper and a Kafka instance
> running on my localhost. I have a topic named "twitter-topic" that has some
> tweets in it. This is all working as expected. I can run the consumer in
> the terminal and it returns the tweets. I want to use the KafkaSpout to
> connect to the Kafka topic and pull the tweets into a topology. I have been
> working on this a few days now and no success.
> > So far i have learned that when Storm is run in local mode that it uses
> an in memory zookeeper on port 2000, which would not allow it to connect to
> the Kafka topic. I have tried to get around this using the following syntax
> that i found online:
> > LocalCluster cluster = new LocalCluster("localhost", new Long(2181));
> > It is still not working but it seems to be connecting to Kafka as it
> gives a 'closed socket connection' message when i cancel the operation
> (after it does not work and hangs open). It also says in the storm output
> that it is connected to localhost 2181 so it seems to be getting that far.
> I have included the full output from Storm in a txt file attached.
> > Here is the code i am using in the TestTopologyStaticHosts class:
> >  public static void main(String[] args) throws Exception {
> >         //String zkConnString = "localhost:2181";
> >         GlobalPartitionInformation hostsAndPartitions = new
> GlobalPartitionInformation();
> >         hostsAndPartitions.addPartition(0, new Broker("127.0.0.1",
> 9092));
> >         BrokerHosts brokerHosts = new StaticHosts(hostsAndPartitions);
> >         // BrokerHosts brokerHosts = new ZkHosts(zkConnString,
> "/brokers");
> >         SpoutConfig kafkaConfig = new SpoutConfig(brokerHosts,
> "twitter-topic","/twitter","twitter-topic-id");
> >         kafkaConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
> >         //kafkaConfig.forceStartOffsetTime(-2);
> >         TopologyBuilder builder = new TopologyBuilder();
> >         builder.setSpout("words", new KafkaSpout(kafkaConfig), 10);
> >         builder.setBolt("print", new
> PrinterBolt()).shuffleGrouping("words");
> >         LocalCluster cluster = new LocalCluster("localhost", new
> Long(2181));
> >         Config config = new Config();
> >         config.setDebug(true);
> >         // config.put("storm.zookeeper.servers", "localhost");
> >         // config.put("storm.zookeeper.port", "2181");
> >         cluster.submitTopology("kafka-test", config,
> builder.createTopology());
> >         Thread.sleep(600000);
> >     }
> > Judging by the output it seems that there is a problem with connecting
> to the Kafka partitions.
> > I have tried many different things to get it to work but no luck. I have
> also been looking at using the KafkaSpoutTestTopology class but it is
> expecting arguments including 'dockerIp' which i don't understand.
> > Should i be using Storm in localmode?
> > Should i be using the TestTopologyStaticHosts class or would
> the KafkaSpoutTestTopology class be better?
> > Any help at all would be greatly appreciated because i am really stuck.
> > Kind Regards
> > David Kavanagh
> >
> >
> >
>
>
>


-- 
*Akah Larry N.H*

*Android Platform Engineer*
*Founder IceTeck*
*www.iceteck.com*

Developing technologies for emergence and sustainable development.

RE: Storm KafkaSpout Integration

Posted by david kavanagh <da...@hotmail.com>.
Hey,
What i found online was a simple java Kafka Consumer, and i used that code to write my own KafkaSpout.Here is the link to the consumer that i found: http://wpcertification.blogspot.ie/2014/08/java-client-for-publishing-and.html
I am using my implementation for a college assignment. Its working well enough for me to do what i need, but i doubt very much that it would be production quality. Here is the code anyway, it might be useful for something.
RegardsDavid
-----------------------------------------------------------------------
package storm.kafka;
import backtype.storm.Config;import backtype.storm.metric.api.IMetric;import backtype.storm.spout.SpoutOutputCollector;import backtype.storm.task.TopologyContext;import backtype.storm.topology.OutputFieldsDeclarer;import backtype.storm.topology.base.BaseRichSpout;import kafka.message.Message;import backtype.storm.tuple.Fields;import backtype.storm.tuple.Values;import backtype.storm.utils.Utils;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import storm.kafka.PartitionManager.KafkaMessageId;
import java.util.*;
import java.io.UnsupportedEncodingException;import java.nio.ByteBuffer;import java.util.HashMap;import java.util.List;import java.util.Map;import java.util.Properties;
import kafka.consumer.Consumer;import kafka.consumer.ConsumerConfig;import kafka.consumer.ConsumerIterator;import kafka.consumer.KafkaStream;import kafka.javaapi.consumer.ConsumerConnector;import kafka.javaapi.message.ByteBufferMessageSet;import kafka.message.MessageAndOffset;
public class MyKafkaSpout extends BaseRichSpout {
    SpoutOutputCollector _collector;    KafkaStream<byte[], byte[]> _stream;
    public static class KafkaConnector extends Thread {
    	final static String clientId = "KafkaTweetConsumer";    	final static String TOPIC = "twitter-topic";    	ConsumerConnector consumerConnector;
    	public KafkaConnector(){    	    Properties properties = new Properties();    	    properties.put("zookeeper.connect","localhost:2181");    	    properties.put("group.id","test-group");    	    ConsumerConfig consumerConfig = new ConsumerConfig(properties);    	    consumerConnector = Consumer.createJavaConsumerConnector(consumerConfig);    	}
    	public KafkaStream<byte[], byte[]> getStream() {        	Map<String, Integer> topicCountMap = new HashMap<String, Integer>();        	topicCountMap.put(TOPIC, new Integer(1));        	Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumerConnector.createMessageStreams(topicCountMap);        	KafkaStream<byte[], byte[]> kafkaStream =  consumerMap.get(TOPIC).get(0);        	return kafkaStream;    	}    }


	@Override    public void open(Map conf, final TopologyContext context, final SpoutOutputCollector collector) {    	_collector = collector;
    	KafkaConnector kafkaConnector = new KafkaConnector();    	_stream = kafkaConnector.getStream();
    }
    @Override  	public void nextTuple() {    	ConsumerIterator<byte[], byte[]> it = _stream.iterator();    	String message = new String(it.next().message());    	_collector.emit(new Values(message));  	}
  	@Override  	public void ack(Object id) {  	}
  	@Override  	public void fail(Object id) {  	}
  	@Override    public void declareOutputFields(OutputFieldsDeclarer declarer) {       declarer.declare(new Fields("tweet"));    }}
 

Date: Thu, 31 Mar 2016 11:12:38 -0700
Subject: Re: Storm KafkaSpout Integration
From: tech.login.id2@gmail.com
To: user@storm.apache.org

Hey David,
I would be interested in seeing what Kafka-Spouts you found online and why you found them better.Also, if you have your own Kafka-Spout opensourced in github, a link to that would be great too.
Thanks Tid
On Thu, Mar 31, 2016 at 2:21 AM, david kavanagh <da...@hotmail.com> wrote:



Hi Spico,
I changed the parallelism as you suggested but it didn't work. Yesterday evening i gave up on using the KafkaSpout class that comes with storm. I found some Kafka consumer java classes online and wrote my own Kafka spout which is working fine. Thanks for the advice anyway. 
RegardsDavid 
Date: Wed, 30 Mar 2016 21:33:38 +0300
Subject: Re: Storm KafkaSpout Integration
From: spicoflorin@gmail.com
To: user@storm.apache.org

hi,
 i think the problem that you have is that you have stup one partition per topic, but you try to conume with 10 kafka task spouts. 
check this lines builder.setSpout("words", new KafkaSpout(kafkaConfig), 10);
10 represents the task parslellism for the spout, that shoul be in the case of kafka the same number as the partition you have  setup for kafka topic. you use more than one kafka partition when you would like to consume in parallel the data from the topic. please check the very good documentation on ksfka partition on confluent site.
 in my opinon, set up your hint parallelism to 1 would solve the problem. tne max spout pending has a different meaning.
regards,
florin

On Wednesday, March 30, 2016, david kavanagh <da...@hotmail.com> wrote:
> I am only creating one partition in code here:
>  GlobalPartitionInformation hostsAndPartitions = new GlobalPartitionInformation();
>  hostsAndPartitions.addPartition(0, new Broker("127.0.0.1", 9092));
>  BrokerHosts brokerHosts = new StaticHosts(hostsAndPartitions);
> I hope that answered your question. I am new to both Storm and Kafka so i am not sure exactly how it works. 
> If i am understanding you correctly, the line you told me to add in the first email should work because i am only creating one partition?
> config.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 1);
> Thanks again for the help :-)
> David
> ________________________________
> Date: Wed, 30 Mar 2016 15:36:19 +0530
> Subject: Re: Storm KafkaSpout Integration
> From: dkiralam@aadhya-analytics.com
> To: user@storm.apache.org
>
>
> Hi david,
>
> Can I know how many partitions you are having?
> statement I have given to you is default.if you are  running with no of  partitions make sure you give same number eg: if you are running with two partitions change the number to 2 in the statement .
> config.put(Config.TOPOLOGY_MAX_SPOUT_PENDING,2 );
>  
> Best regards,
> K.Sai Dilip Reddy.
> On Wed, Mar 30, 2016 at 3:00 PM, david kavanagh <da...@hotmail.com> wrote:
>
> Thanks for the reply!
> I added the line as you suggested but there is still no difference unfortunately. 
> I am just guessing at this stage but judging by the output below it, it seems like it is something to do with the partitioning or the offset.
> The warnings start by staying that  there are more tasks than partitions. 
> Task 1 is assigned the partition that is created in the code (highlighted in green), then the rest of the tasks are not assigned any partitions.
> Eventually is states 'Read partition information from: /twitter/twitter-topic-id/partition_0  --> null'
> So it seems like it is not reading data from Kafka at all. I really don't understand what is going on here.
> Any ideas?
>
> Kind Regards
> David
> --------------------------------------------------
> Storm Output:
> Thread-9-print] INFO  backtype.storm.daemon.executor - Prepared bolt print:(2)
> 32644 [Thread-11-words] INFO  org.apache.curator.framework.imps.CuratorFrameworkImpl - Starting
> 32685 [Thread-19-words] WARN  storm.kafka.KafkaUtils - there are more tasks than partitions (tasks: 10; partitions: 1), some tasks will be idle
> 32686 [Thread-19-words] WARN  storm.kafka.KafkaUtils - Task [5/10] no partitions assigned
> 32686 [Thread-17-words] WARN  storm.kafka.KafkaUtils - there are more tasks than partitions (tasks: 10; partitions: 1), some tasks will be idle
> 32686 [Thread-15-words] WARN  storm.kafka.KafkaUtils - there are more tasks than partitions (tasks: 10; partitions: 1), some tasks will be idle
> 32686 [Thread-17-words] WARN  storm.kafka.KafkaUtils - Task [4/10] no partitions assigned
> 32686 [Thread-15-words] WARN  storm.kafka.KafkaUtils - Task [3/10] no partitions assigned
> 32686 [Thread-11-words] WARN  storm.kafka.KafkaUtils - there are more tasks than partitions (tasks: 10; partitions: 1), some tasks will be idle
> 32686 [Thread-11-words] INFO  storm.kafka.KafkaUtils - Task [1/10] assigned [Partition{host=127.0.0.1:9092, partition=0}]
> 32687 [Thread-29-words] WARN  storm.kafka.KafkaUtils - there are more tasks than partitions (tasks: 10; partitions: 1), some tasks will be idle
> 32697 [Thread-19-words-EventThread] INFO  org.apache.curator.framework.state.ConnectionStateManager - State change: CONNECTED
> 32697 [Thread-25-words-EventThread] INFO  org.apache.curator.framework.state.ConnectionStateManager - State change: CONNECTED
> 32697 [Thread-29-words-EventThread] INFO  org.apache.curator.framework.state.ConnectionStateManager - State change: CONNECTED
> 32697 [Thread-13-words-EventThread] INFO  org.apache.curator.framework.state.ConnectionStateManager - State change: CONNECTED
> 32697 [Thread-27-words-EventThread] INFO  org.apache.curator.framework.state.ConnectionStateManager - State change: CONNECTED
> 32697 [Thread-15-words-EventThread] INFO  org.apache.curator.framework.state.ConnectionStateManager - State change: CONNECTED
> 32689 [Thread-19-words] INFO  backtype.storm.daemon.executor - Opened spout words:(7)
> 32689 [Thread-25-words] WARN  storm.kafka.KafkaUtils - there are more tasks than partitions (tasks: 10; partitions: 1), some tasks will be idle
> 32689 [Thread-15-words] INFO  backtype.storm.daemon.executor - Opened spout words:(5)
> 32738 [Thread-25-words] WARN  storm.kafka.KafkaUtils - Task [8/10] no partitions assigned
> 32738 [Thread-25-words] INFO  backtype.storm.daemon.executor - Opened spout words:(10)
> 32689 [Thread-17-words] INFO  backtype.storm.daemon.executor - Opened spout words:(6)
> 32688 [Thread-13-words] WARN  storm.kafka.KafkaUtils - there are more tasks than partitions (tasks: 10; partitions: 1), some tasks will be idle
> 32688 [Thread-21-words] WARN  storm.kafka.KafkaUtils - there are more tasks than partitions (tasks: 10; partitions: 1), some tasks will be idle
> 32739 [Thread-21-words] WARN  storm.kafka.KafkaUtils - Task [6/10] no partitions assigned
> 32739 [Thread-21-words] INFO  backtype.storm.daemon.executor - Opened spout words:(8)
> 32687 [Thread-27-words] WARN  storm.kafka.KafkaUtils - there are more tasks than partitions (tasks: 10; partitions: 1), some tasks will be idle
> 32740 [Thread-27-words] WARN  storm.kafka.KafkaUtils - Task [9/10] no partitions assigned
> 32740 [Thread-27-words] INFO  backtype.storm.daemon.executor - Opened spout words:(11)
> 32687 [Thread-23-words] WARN  storm.kafka.KafkaUtils - there are more tasks than partitions (tasks: 10; partitions: 1), some tasks will be idle
> 32740 [Thread-23-words] WARN  storm.kafka.KafkaUtils - Task [7/10] no partitions assigned
> 32736 [Thread-29-words] WARN  storm.kafka.KafkaUtils - Task [10/10] no partitions assigned
> 32742 [Thread-17-words] INFO  backtype.storm.daemon.executor - Activating spout words:(6)
> 32872 [Thread-29-words] INFO  backtype.storm.daemon.executor - Opened spout words:(12)
> 32742 [Thread-25-words] INFO  backtype.storm.daemon.executor - Activating spout words:(10)
> 32742 [Thread-21-words] INFO  backtype.storm.daemon.executor - Activating spout words:(8)
> 32742 [Thread-27-words] INFO  backtype.storm.daemon.executor - Activating spout words:(11)
> 32742 [Thread-19-words] INFO  backtype.storm.daemon.executor - Activating spout words:(7)
> 32741 [Thread-15-words] INFO  backtype.storm.daemon.executor - Activating spout words:(5)
> 32740 [Thread-13-words] WARN  storm.kafka.KafkaUtils - Task [2/10] no partitions assigned
> 32873 [Thread-29-words] INFO  backtype.storm.daemon.executor - Activating spout words:(12)
> 32872 [Thread-23-words] INFO  backtype.storm.daemon.executor - Opened spout words:(9)
> 32873 [Thread-13-words] INFO  backtype.storm.daemon.executor - Opened spout words:(4)
> 32873 [Thread-23-words] INFO  backtype.storm.daemon.executor - Activating spout words:(9)
> 32873 [Thread-13-words] INFO  backtype.storm.daemon.executor - Activating spout words:(4)
> 37756 [Thread-23-words-EventThread] INFO  org.apache.curator.framework.state.ConnectionStateManager - State change: CONNECTED
> 37757 [Thread-17-words-EventThread] INFO  org.apache.curator.framework.state.ConnectionStateManager - State change: CONNECTED
> 37757 [Thread-21-words-EventThread] INFO  org.apache.curator.framework.state.ConnectionStateManager - State change: CONNECTED
> 37757 [Thread-11-words-EventThread] INFO  org.apache.curator.framework.state.ConnectionStateManager - State change: CONNECTED
> 37773 [Thread-11-words] INFO  storm.kafka.PartitionManager - Read partition information from: /twitter/twitter-topic-id/partition_0  --> null
> 37915 [Thread-11-words] INFO  storm.kafka.PartitionManager - No partition information found, using configuration to determine offset
> 37915 [Thread-11-words] INFO  storm.kafka.PartitionManager - Starting Kafka 127.0.0.1:0 from offset 185
> 37916 [Thread-11-words] INFO  backtype.storm.daemon.executor - Opened spout words:(3)
> 37917 [Thread-11-words] INFO  backtype.storm.daemon.executor - Activating spout words:(3)
> 62005 [Thread-11-words] INFO  backtype.storm.daemon.executor - Processing received message source: __system:-1, stream: __tick, id: {}, [30]
> 62013 [Thread-13-words] INFO  backtype.storm.daemon.executor - Processing received message source: __system:-1, stream: __tick, id: {}, [30]
>
> ________________________________
> Date: Wed, 30 Mar 2016 10:10:54 +0530
> Subject: Re: Storm KafkaSpout Integration
> From: dkiralam@aadhya-analytics.com
> To: user@storm.apache.org
>
> Hi david,
>
> I think everything is good but you are missing a statement
> config.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 1); add it after the line  config.setDebug(true);
>  
> Best regards,
> K.Sai Dilip Reddy.
> On Tue, Mar 29, 2016 at 10:03 PM, david kavanagh <da...@hotmail.com> wrote:
>
> Hi all,
> I am currently trying use TestTopologyStaticHosts to try connect the KafkaSpout to a Kafka topic. I have a ZooKeeper and a Kafka instance running on my localhost. I have a topic named "twitter-topic" that has some tweets in it. This is all working as expected. I can run the consumer in the terminal and it returns the tweets. I want to use the KafkaSpout to connect to the Kafka topic and pull the tweets into a topology. I have been working on this a few days now and no success.
> So far i have learned that when Storm is run in local mode that it uses an in memory zookeeper on port 2000, which would not allow it to connect to the Kafka topic. I have tried to get around this using the following syntax that i found online:
> LocalCluster cluster = new LocalCluster("localhost", new Long(2181));
> It is still not working but it seems to be connecting to Kafka as it gives a 'closed socket connection' message when i cancel the operation (after it does not work and hangs open). It also says in the storm output that it is connected to localhost 2181 so it seems to be getting that far. I have included the full output from Storm in a txt file attached.
> Here is the code i am using in the TestTopologyStaticHosts class:
>  public static void main(String[] args) throws Exception {
>         //String zkConnString = "localhost:2181";
>         GlobalPartitionInformation hostsAndPartitions = new GlobalPartitionInformation();
>         hostsAndPartitions.addPartition(0, new Broker("127.0.0.1", 9092));
>         BrokerHosts brokerHosts = new StaticHosts(hostsAndPartitions);
>         // BrokerHosts brokerHosts = new ZkHosts(zkConnString, "/brokers");
>         SpoutConfig kafkaConfig = new SpoutConfig(brokerHosts, "twitter-topic","/twitter","twitter-topic-id");
>         kafkaConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
>         //kafkaConfig.forceStartOffsetTime(-2);
>         TopologyBuilder builder = new TopologyBuilder();
>         builder.setSpout("words", new KafkaSpout(kafkaConfig), 10);
>         builder.setBolt("print", new PrinterBolt()).shuffleGrouping("words");
>         LocalCluster cluster = new LocalCluster("localhost", new Long(2181));
>         Config config = new Config();
>         config.setDebug(true);
>         // config.put("storm.zookeeper.servers", "localhost");
>         // config.put("storm.zookeeper.port", "2181"); 
>         cluster.submitTopology("kafka-test", config, builder.createTopology());
>         Thread.sleep(600000);
>     }
> Judging by the output it seems that there is a problem with connecting to the Kafka partitions.
> I have tried many different things to get it to work but no luck. I have also been looking at using the KafkaSpoutTestTopology class but it is expecting arguments including 'dockerIp' which i don't understand.
> Should i be using Storm in localmode?
> Should i be using the TestTopologyStaticHosts class or would the KafkaSpoutTestTopology class be better?
> Any help at all would be greatly appreciated because i am really stuck.
> Kind Regards
> David Kavanagh
>
>
> 		 	   		  

 		 	   		  

Re: Storm KafkaSpout Integration

Posted by Tech Id <te...@gmail.com>.
Hey David,

I would be interested in seeing what Kafka-Spouts you found online and why
you found them better.
Also, if you have your own Kafka-Spout opensourced in github, a link to
that would be great too.

Thanks
Tid

On Thu, Mar 31, 2016 at 2:21 AM, david kavanagh <da...@hotmail.com>
wrote:

> Hi Spico,
>
> I changed the parallelism as you suggested but it didn't work. Yesterday
> evening i gave up on using the KafkaSpout class that comes with storm. I
> found some Kafka consumer java classes online and wrote my own Kafka spout
> which is working fine. Thanks for the advice anyway.
>
> Regards
> David
>
> ------------------------------
> Date: Wed, 30 Mar 2016 21:33:38 +0300
> Subject: Re: Storm KafkaSpout Integration
> From: spicoflorin@gmail.com
> To: user@storm.apache.org
>
> hi,
> i think the problem that you have is that you have stup one partition per
> topic, but you try to conume with 10 kafka task spouts.
> check this lines builder.setSpout("words", new KafkaSpout(kafkaConfig),
> 10);
> 10 represents the task parslellism for the spout, that shoul be in the
> case of kafka the same number as the partition you have  setup for kafka
> topic. you use more than one kafka partition when you would like to consume
> in parallel the data from the topic. please check the very good
> documentation on ksfka partition on confluent site.
> in my opinon, set up your hint parallelism to 1 would solve the problem.
> tne max spout pending has a different meaning.
> regards,
> florin
>
> On Wednesday, March 30, 2016, david kavanagh <da...@hotmail.com>
> wrote:
> > I am only creating one partition in code here:
> >  GlobalPartitionInformation hostsAndPartitions = new
> GlobalPartitionInformation();
> >  hostsAndPartitions.addPartition(0, new Broker("127.0.0.1", 9092));
> >  BrokerHosts brokerHosts = new StaticHosts(hostsAndPartitions);
> > I hope that answered your question. I am new to both Storm and Kafka so
> i am not sure exactly how it works.
> > If i am understanding you correctly, the line you told me to add in the
> first email should work because i am only creating one partition?
> > config.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 1);
> > Thanks again for the help :-)
> > David
> > ________________________________
> > Date: Wed, 30 Mar 2016 15:36:19 +0530
> > Subject: Re: Storm KafkaSpout Integration
> > From: dkiralam@aadhya-analytics.com
> > To: user@storm.apache.org
> >
> >
> > Hi david,
> >
> > Can I know how many partitions you are having?
> > statement I have given to you is default.if you are  running with no of
> partitions make sure you give same number eg: if you are running with two
> partitions change the number to 2 in the statement .
> > config.put(Config.TOPOLOGY_MAX_SPOUT_PENDING,2 );
> >
> > Best regards,
> > K.Sai Dilip Reddy.
> > On Wed, Mar 30, 2016 at 3:00 PM, david kavanagh <da...@hotmail.com>
> wrote:
> >
> > Thanks for the reply!
> > I added the line as you suggested but there is still no difference
> unfortunately.
> > I am just guessing at this stage but judging by the output below it, it
> seems like it is something to do with the partitioning or the offset.
> > The warnings start by staying that  there are more tasks than
> partitions.
> > Task 1 is assigned the partition that is created in the code
> (highlighted in green), then the rest of the tasks are not assigned any
> partitions.
> > Eventually is states 'Read partition information from:
> /twitter/twitter-topic-id/partition_0  --> null'
> > So it seems like it is not reading data from Kafka at all. I really
> don't understand what is going on here.
> > Any ideas?
> >
> > Kind Regards
> > David
> > --------------------------------------------------
> > Storm Output:
> > Thread-9-print] INFO  backtype.storm.daemon.executor - Prepared bolt
> print:(2)
> > 32644 [Thread-11-words] INFO
>  org.apache.curator.framework.imps.CuratorFrameworkImpl - Starting
> > 32685 [Thread-19-words] WARN  storm.kafka.KafkaUtils - there are more
> tasks than partitions (tasks: 10; partitions: 1), some tasks will be idle
> > 32686 [Thread-19-words] WARN  storm.kafka.KafkaUtils - Task [5/10] no
> partitions assigned
> > 32686 [Thread-17-words] WARN  storm.kafka.KafkaUtils - there are more
> tasks than partitions (tasks: 10; partitions: 1), some tasks will be idle
> > 32686 [Thread-15-words] WARN  storm.kafka.KafkaUtils - there are more
> tasks than partitions (tasks: 10; partitions: 1), some tasks will be idle
> > 32686 [Thread-17-words] WARN  storm.kafka.KafkaUtils - Task [4/10] no
> partitions assigned
> > 32686 [Thread-15-words] WARN  storm.kafka.KafkaUtils - Task [3/10] no
> partitions assigned
> > 32686 [Thread-11-words] WARN  storm.kafka.KafkaUtils - there are more
> tasks than partitions (tasks: 10; partitions: 1), some tasks will be idle
> > 32686 [Thread-11-words] INFO  storm.kafka.KafkaUtils - Task [1/10]
> assigned [Partition{host=127.0.0.1:9092, partition=0}]
> > 32687 [Thread-29-words] WARN  storm.kafka.KafkaUtils - there are more
> tasks than partitions (tasks: 10; partitions: 1), some tasks will be idle
> > 32697 [Thread-19-words-EventThread] INFO
>  org.apache.curator.framework.state.ConnectionStateManager - State change:
> CONNECTED
> > 32697 [Thread-25-words-EventThread] INFO
>  org.apache.curator.framework.state.ConnectionStateManager - State change:
> CONNECTED
> > 32697 [Thread-29-words-EventThread] INFO
>  org.apache.curator.framework.state.ConnectionStateManager - State change:
> CONNECTED
> > 32697 [Thread-13-words-EventThread] INFO
>  org.apache.curator.framework.state.ConnectionStateManager - State change:
> CONNECTED
> > 32697 [Thread-27-words-EventThread] INFO
>  org.apache.curator.framework.state.ConnectionStateManager - State change:
> CONNECTED
> > 32697 [Thread-15-words-EventThread] INFO
>  org.apache.curator.framework.state.ConnectionStateManager - State change:
> CONNECTED
> > 32689 [Thread-19-words] INFO  backtype.storm.daemon.executor - Opened
> spout words:(7)
> > 32689 [Thread-25-words] WARN  storm.kafka.KafkaUtils - there are more
> tasks than partitions (tasks: 10; partitions: 1), some tasks will be idle
> > 32689 [Thread-15-words] INFO  backtype.storm.daemon.executor - Opened
> spout words:(5)
> > 32738 [Thread-25-words] WARN  storm.kafka.KafkaUtils - Task [8/10] no
> partitions assigned
> > 32738 [Thread-25-words] INFO  backtype.storm.daemon.executor - Opened
> spout words:(10)
> > 32689 [Thread-17-words] INFO  backtype.storm.daemon.executor - Opened
> spout words:(6)
> > 32688 [Thread-13-words] WARN  storm.kafka.KafkaUtils - there are more
> tasks than partitions (tasks: 10; partitions: 1), some tasks will be idle
> > 32688 [Thread-21-words] WARN  storm.kafka.KafkaUtils - there are more
> tasks than partitions (tasks: 10; partitions: 1), some tasks will be idle
> > 32739 [Thread-21-words] WARN  storm.kafka.KafkaUtils - Task [6/10] no
> partitions assigned
> > 32739 [Thread-21-words] INFO  backtype.storm.daemon.executor - Opened
> spout words:(8)
> > 32687 [Thread-27-words] WARN  storm.kafka.KafkaUtils - there are more
> tasks than partitions (tasks: 10; partitions: 1), some tasks will be idle
> > 32740 [Thread-27-words] WARN  storm.kafka.KafkaUtils - Task [9/10] no
> partitions assigned
> > 32740 [Thread-27-words] INFO  backtype.storm.daemon.executor - Opened
> spout words:(11)
> > 32687 [Thread-23-words] WARN  storm.kafka.KafkaUtils - there are more
> tasks than partitions (tasks: 10; partitions: 1), some tasks will be idle
> > 32740 [Thread-23-words] WARN  storm.kafka.KafkaUtils - Task [7/10] no
> partitions assigned
> > 32736 [Thread-29-words] WARN  storm.kafka.KafkaUtils - Task [10/10] no
> partitions assigned
> > 32742 [Thread-17-words] INFO  backtype.storm.daemon.executor -
> Activating spout words:(6)
> > 32872 [Thread-29-words] INFO  backtype.storm.daemon.executor - Opened
> spout words:(12)
> > 32742 [Thread-25-words] INFO  backtype.storm.daemon.executor -
> Activating spout words:(10)
> > 32742 [Thread-21-words] INFO  backtype.storm.daemon.executor -
> Activating spout words:(8)
> > 32742 [Thread-27-words] INFO  backtype.storm.daemon.executor -
> Activating spout words:(11)
> > 32742 [Thread-19-words] INFO  backtype.storm.daemon.executor -
> Activating spout words:(7)
> > 32741 [Thread-15-words] INFO  backtype.storm.daemon.executor -
> Activating spout words:(5)
> > 32740 [Thread-13-words] WARN  storm.kafka.KafkaUtils - Task [2/10] no
> partitions assigned
> > 32873 [Thread-29-words] INFO  backtype.storm.daemon.executor -
> Activating spout words:(12)
> > 32872 [Thread-23-words] INFO  backtype.storm.daemon.executor - Opened
> spout words:(9)
> > 32873 [Thread-13-words] INFO  backtype.storm.daemon.executor - Opened
> spout words:(4)
> > 32873 [Thread-23-words] INFO  backtype.storm.daemon.executor -
> Activating spout words:(9)
> > 32873 [Thread-13-words] INFO  backtype.storm.daemon.executor -
> Activating spout words:(4)
> > 37756 [Thread-23-words-EventThread] INFO
>  org.apache.curator.framework.state.ConnectionStateManager - State change:
> CONNECTED
> > 37757 [Thread-17-words-EventThread] INFO
>  org.apache.curator.framework.state.ConnectionStateManager - State change:
> CONNECTED
> > 37757 [Thread-21-words-EventThread] INFO
>  org.apache.curator.framework.state.ConnectionStateManager - State change:
> CONNECTED
> > 37757 [Thread-11-words-EventThread] INFO
>  org.apache.curator.framework.state.ConnectionStateManager - State change:
> CONNECTED
> > 37773 [Thread-11-words] INFO  storm.kafka.PartitionManager - Read
> partition information from: /twitter/twitter-topic-id/partition_0  --> null
> > 37915 [Thread-11-words] INFO  storm.kafka.PartitionManager - No
> partition information found, using configuration to determine offset
> > 37915 [Thread-11-words] INFO  storm.kafka.PartitionManager - Starting
> Kafka 127.0.0.1:0 from offset 185
> > 37916 [Thread-11-words] INFO  backtype.storm.daemon.executor - Opened
> spout words:(3)
> > 37917 [Thread-11-words] INFO  backtype.storm.daemon.executor -
> Activating spout words:(3)
> > 62005 [Thread-11-words] INFO  backtype.storm.daemon.executor -
> Processing received message source: __system:-1, stream: __tick, id: {},
> [30]
> > 62013 [Thread-13-words] INFO  backtype.storm.daemon.executor -
> Processing received message source: __system:-1, stream: __tick, id: {},
> [30]
> >
> > ________________________________
> > Date: Wed, 30 Mar 2016 10:10:54 +0530
> > Subject: Re: Storm KafkaSpout Integration
> > From: dkiralam@aadhya-analytics.com
> > To: user@storm.apache.org
> >
> > Hi david,
> >
> > I think everything is good but you are missing a statement
> > config.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 1); add it after the line
> config.setDebug(true);
> >
> > Best regards,
> > K.Sai Dilip Reddy.
> > On Tue, Mar 29, 2016 at 10:03 PM, david kavanagh <da...@hotmail.com>
> wrote:
> >
> > Hi all,
> > I am currently trying use TestTopologyStaticHosts to try connect the
> KafkaSpout to a Kafka topic. I have a ZooKeeper and a Kafka instance
> running on my localhost. I have a topic named "twitter-topic" that has some
> tweets in it. This is all working as expected. I can run the consumer in
> the terminal and it returns the tweets. I want to use the KafkaSpout to
> connect to the Kafka topic and pull the tweets into a topology. I have been
> working on this a few days now and no success.
> > So far i have learned that when Storm is run in local mode that it uses
> an in memory zookeeper on port 2000, which would not allow it to connect to
> the Kafka topic. I have tried to get around this using the following syntax
> that i found online:
> > LocalCluster cluster = new LocalCluster("localhost", new Long(2181));
> > It is still not working but it seems to be connecting to Kafka as it
> gives a 'closed socket connection' message when i cancel the operation
> (after it does not work and hangs open). It also says in the storm output
> that it is connected to localhost 2181 so it seems to be getting that far.
> I have included the full output from Storm in a txt file attached.
> > Here is the code i am using in the TestTopologyStaticHosts class:
> >  public static void main(String[] args) throws Exception {
> >         //String zkConnString = "localhost:2181";
> >         GlobalPartitionInformation hostsAndPartitions = new
> GlobalPartitionInformation();
> >         hostsAndPartitions.addPartition(0, new Broker("127.0.0.1",
> 9092));
> >         BrokerHosts brokerHosts = new StaticHosts(hostsAndPartitions);
> >         // BrokerHosts brokerHosts = new ZkHosts(zkConnString,
> "/brokers");
> >         SpoutConfig kafkaConfig = new SpoutConfig(brokerHosts,
> "twitter-topic","/twitter","twitter-topic-id");
> >         kafkaConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
> >         //kafkaConfig.forceStartOffsetTime(-2);
> >         TopologyBuilder builder = new TopologyBuilder();
> >         builder.setSpout("words", new KafkaSpout(kafkaConfig), 10);
> >         builder.setBolt("print", new
> PrinterBolt()).shuffleGrouping("words");
> >         LocalCluster cluster = new LocalCluster("localhost", new
> Long(2181));
> >         Config config = new Config();
> >         config.setDebug(true);
> >         // config.put("storm.zookeeper.servers", "localhost");
> >         // config.put("storm.zookeeper.port", "2181");
> >         cluster.submitTopology("kafka-test", config,
> builder.createTopology());
> >         Thread.sleep(600000);
> >     }
> > Judging by the output it seems that there is a problem with connecting
> to the Kafka partitions.
> > I have tried many different things to get it to work but no luck. I have
> also been looking at using the KafkaSpoutTestTopology class but it is
> expecting arguments including 'dockerIp' which i don't understand.
> > Should i be using Storm in localmode?
> > Should i be using the TestTopologyStaticHosts class or would
> the KafkaSpoutTestTopology class be better?
> > Any help at all would be greatly appreciated because i am really stuck.
> > Kind Regards
> > David Kavanagh
> >
> >
> >
>

RE: Storm KafkaSpout Integration

Posted by david kavanagh <da...@hotmail.com>.
Hi Spico,
I changed the parallelism as you suggested but it didn't work. Yesterday evening i gave up on using the KafkaSpout class that comes with storm. I found some Kafka consumer java classes online and wrote my own Kafka spout which is working fine. Thanks for the advice anyway. 
RegardsDavid 
Date: Wed, 30 Mar 2016 21:33:38 +0300
Subject: Re: Storm KafkaSpout Integration
From: spicoflorin@gmail.com
To: user@storm.apache.org

hi,
 i think the problem that you have is that you have stup one partition per topic, but you try to conume with 10 kafka task spouts. 
check this lines builder.setSpout("words", new KafkaSpout(kafkaConfig), 10);
10 represents the task parslellism for the spout, that shoul be in the case of kafka the same number as the partition you have  setup for kafka topic. you use more than one kafka partition when you would like to consume in parallel the data from the topic. please check the very good documentation on ksfka partition on confluent site.
 in my opinon, set up your hint parallelism to 1 would solve the problem. tne max spout pending has a different meaning.
regards,
florin

On Wednesday, March 30, 2016, david kavanagh <da...@hotmail.com> wrote:
> I am only creating one partition in code here:
>  GlobalPartitionInformation hostsAndPartitions = new GlobalPartitionInformation();
>  hostsAndPartitions.addPartition(0, new Broker("127.0.0.1", 9092));
>  BrokerHosts brokerHosts = new StaticHosts(hostsAndPartitions);
> I hope that answered your question. I am new to both Storm and Kafka so i am not sure exactly how it works. 
> If i am understanding you correctly, the line you told me to add in the first email should work because i am only creating one partition?
> config.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 1);
> Thanks again for the help :-)
> David
> ________________________________
> Date: Wed, 30 Mar 2016 15:36:19 +0530
> Subject: Re: Storm KafkaSpout Integration
> From: dkiralam@aadhya-analytics.com
> To: user@storm.apache.org
>
>
> Hi david,
>
> Can I know how many partitions you are having?
> statement I have given to you is default.if you are  running with no of  partitions make sure you give same number eg: if you are running with two partitions change the number to 2 in the statement .
> config.put(Config.TOPOLOGY_MAX_SPOUT_PENDING,2 );
>  
> Best regards,
> K.Sai Dilip Reddy.
> On Wed, Mar 30, 2016 at 3:00 PM, david kavanagh <da...@hotmail.com> wrote:
>
> Thanks for the reply!
> I added the line as you suggested but there is still no difference unfortunately. 
> I am just guessing at this stage but judging by the output below it, it seems like it is something to do with the partitioning or the offset.
> The warnings start by staying that  there are more tasks than partitions. 
> Task 1 is assigned the partition that is created in the code (highlighted in green), then the rest of the tasks are not assigned any partitions.
> Eventually is states 'Read partition information from: /twitter/twitter-topic-id/partition_0  --> null'
> So it seems like it is not reading data from Kafka at all. I really don't understand what is going on here.
> Any ideas?
>
> Kind Regards
> David
> --------------------------------------------------
> Storm Output:
> Thread-9-print] INFO  backtype.storm.daemon.executor - Prepared bolt print:(2)
> 32644 [Thread-11-words] INFO  org.apache.curator.framework.imps.CuratorFrameworkImpl - Starting
> 32685 [Thread-19-words] WARN  storm.kafka.KafkaUtils - there are more tasks than partitions (tasks: 10; partitions: 1), some tasks will be idle
> 32686 [Thread-19-words] WARN  storm.kafka.KafkaUtils - Task [5/10] no partitions assigned
> 32686 [Thread-17-words] WARN  storm.kafka.KafkaUtils - there are more tasks than partitions (tasks: 10; partitions: 1), some tasks will be idle
> 32686 [Thread-15-words] WARN  storm.kafka.KafkaUtils - there are more tasks than partitions (tasks: 10; partitions: 1), some tasks will be idle
> 32686 [Thread-17-words] WARN  storm.kafka.KafkaUtils - Task [4/10] no partitions assigned
> 32686 [Thread-15-words] WARN  storm.kafka.KafkaUtils - Task [3/10] no partitions assigned
> 32686 [Thread-11-words] WARN  storm.kafka.KafkaUtils - there are more tasks than partitions (tasks: 10; partitions: 1), some tasks will be idle
> 32686 [Thread-11-words] INFO  storm.kafka.KafkaUtils - Task [1/10] assigned [Partition{host=127.0.0.1:9092, partition=0}]
> 32687 [Thread-29-words] WARN  storm.kafka.KafkaUtils - there are more tasks than partitions (tasks: 10; partitions: 1), some tasks will be idle
> 32697 [Thread-19-words-EventThread] INFO  org.apache.curator.framework.state.ConnectionStateManager - State change: CONNECTED
> 32697 [Thread-25-words-EventThread] INFO  org.apache.curator.framework.state.ConnectionStateManager - State change: CONNECTED
> 32697 [Thread-29-words-EventThread] INFO  org.apache.curator.framework.state.ConnectionStateManager - State change: CONNECTED
> 32697 [Thread-13-words-EventThread] INFO  org.apache.curator.framework.state.ConnectionStateManager - State change: CONNECTED
> 32697 [Thread-27-words-EventThread] INFO  org.apache.curator.framework.state.ConnectionStateManager - State change: CONNECTED
> 32697 [Thread-15-words-EventThread] INFO  org.apache.curator.framework.state.ConnectionStateManager - State change: CONNECTED
> 32689 [Thread-19-words] INFO  backtype.storm.daemon.executor - Opened spout words:(7)
> 32689 [Thread-25-words] WARN  storm.kafka.KafkaUtils - there are more tasks than partitions (tasks: 10; partitions: 1), some tasks will be idle
> 32689 [Thread-15-words] INFO  backtype.storm.daemon.executor - Opened spout words:(5)
> 32738 [Thread-25-words] WARN  storm.kafka.KafkaUtils - Task [8/10] no partitions assigned
> 32738 [Thread-25-words] INFO  backtype.storm.daemon.executor - Opened spout words:(10)
> 32689 [Thread-17-words] INFO  backtype.storm.daemon.executor - Opened spout words:(6)
> 32688 [Thread-13-words] WARN  storm.kafka.KafkaUtils - there are more tasks than partitions (tasks: 10; partitions: 1), some tasks will be idle
> 32688 [Thread-21-words] WARN  storm.kafka.KafkaUtils - there are more tasks than partitions (tasks: 10; partitions: 1), some tasks will be idle
> 32739 [Thread-21-words] WARN  storm.kafka.KafkaUtils - Task [6/10] no partitions assigned
> 32739 [Thread-21-words] INFO  backtype.storm.daemon.executor - Opened spout words:(8)
> 32687 [Thread-27-words] WARN  storm.kafka.KafkaUtils - there are more tasks than partitions (tasks: 10; partitions: 1), some tasks will be idle
> 32740 [Thread-27-words] WARN  storm.kafka.KafkaUtils - Task [9/10] no partitions assigned
> 32740 [Thread-27-words] INFO  backtype.storm.daemon.executor - Opened spout words:(11)
> 32687 [Thread-23-words] WARN  storm.kafka.KafkaUtils - there are more tasks than partitions (tasks: 10; partitions: 1), some tasks will be idle
> 32740 [Thread-23-words] WARN  storm.kafka.KafkaUtils - Task [7/10] no partitions assigned
> 32736 [Thread-29-words] WARN  storm.kafka.KafkaUtils - Task [10/10] no partitions assigned
> 32742 [Thread-17-words] INFO  backtype.storm.daemon.executor - Activating spout words:(6)
> 32872 [Thread-29-words] INFO  backtype.storm.daemon.executor - Opened spout words:(12)
> 32742 [Thread-25-words] INFO  backtype.storm.daemon.executor - Activating spout words:(10)
> 32742 [Thread-21-words] INFO  backtype.storm.daemon.executor - Activating spout words:(8)
> 32742 [Thread-27-words] INFO  backtype.storm.daemon.executor - Activating spout words:(11)
> 32742 [Thread-19-words] INFO  backtype.storm.daemon.executor - Activating spout words:(7)
> 32741 [Thread-15-words] INFO  backtype.storm.daemon.executor - Activating spout words:(5)
> 32740 [Thread-13-words] WARN  storm.kafka.KafkaUtils - Task [2/10] no partitions assigned
> 32873 [Thread-29-words] INFO  backtype.storm.daemon.executor - Activating spout words:(12)
> 32872 [Thread-23-words] INFO  backtype.storm.daemon.executor - Opened spout words:(9)
> 32873 [Thread-13-words] INFO  backtype.storm.daemon.executor - Opened spout words:(4)
> 32873 [Thread-23-words] INFO  backtype.storm.daemon.executor - Activating spout words:(9)
> 32873 [Thread-13-words] INFO  backtype.storm.daemon.executor - Activating spout words:(4)
> 37756 [Thread-23-words-EventThread] INFO  org.apache.curator.framework.state.ConnectionStateManager - State change: CONNECTED
> 37757 [Thread-17-words-EventThread] INFO  org.apache.curator.framework.state.ConnectionStateManager - State change: CONNECTED
> 37757 [Thread-21-words-EventThread] INFO  org.apache.curator.framework.state.ConnectionStateManager - State change: CONNECTED
> 37757 [Thread-11-words-EventThread] INFO  org.apache.curator.framework.state.ConnectionStateManager - State change: CONNECTED
> 37773 [Thread-11-words] INFO  storm.kafka.PartitionManager - Read partition information from: /twitter/twitter-topic-id/partition_0  --> null
> 37915 [Thread-11-words] INFO  storm.kafka.PartitionManager - No partition information found, using configuration to determine offset
> 37915 [Thread-11-words] INFO  storm.kafka.PartitionManager - Starting Kafka 127.0.0.1:0 from offset 185
> 37916 [Thread-11-words] INFO  backtype.storm.daemon.executor - Opened spout words:(3)
> 37917 [Thread-11-words] INFO  backtype.storm.daemon.executor - Activating spout words:(3)
> 62005 [Thread-11-words] INFO  backtype.storm.daemon.executor - Processing received message source: __system:-1, stream: __tick, id: {}, [30]
> 62013 [Thread-13-words] INFO  backtype.storm.daemon.executor - Processing received message source: __system:-1, stream: __tick, id: {}, [30]
>
> ________________________________
> Date: Wed, 30 Mar 2016 10:10:54 +0530
> Subject: Re: Storm KafkaSpout Integration
> From: dkiralam@aadhya-analytics.com
> To: user@storm.apache.org
>
> Hi david,
>
> I think everything is good but you are missing a statement
> config.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 1); add it after the line  config.setDebug(true);
>  
> Best regards,
> K.Sai Dilip Reddy.
> On Tue, Mar 29, 2016 at 10:03 PM, david kavanagh <da...@hotmail.com> wrote:
>
> Hi all,
> I am currently trying use TestTopologyStaticHosts to try connect the KafkaSpout to a Kafka topic. I have a ZooKeeper and a Kafka instance running on my localhost. I have a topic named "twitter-topic" that has some tweets in it. This is all working as expected. I can run the consumer in the terminal and it returns the tweets. I want to use the KafkaSpout to connect to the Kafka topic and pull the tweets into a topology. I have been working on this a few days now and no success.
> So far i have learned that when Storm is run in local mode that it uses an in memory zookeeper on port 2000, which would not allow it to connect to the Kafka topic. I have tried to get around this using the following syntax that i found online:
> LocalCluster cluster = new LocalCluster("localhost", new Long(2181));
> It is still not working but it seems to be connecting to Kafka as it gives a 'closed socket connection' message when i cancel the operation (after it does not work and hangs open). It also says in the storm output that it is connected to localhost 2181 so it seems to be getting that far. I have included the full output from Storm in a txt file attached.
> Here is the code i am using in the TestTopologyStaticHosts class:
>  public static void main(String[] args) throws Exception {
>         //String zkConnString = "localhost:2181";
>         GlobalPartitionInformation hostsAndPartitions = new GlobalPartitionInformation();
>         hostsAndPartitions.addPartition(0, new Broker("127.0.0.1", 9092));
>         BrokerHosts brokerHosts = new StaticHosts(hostsAndPartitions);
>         // BrokerHosts brokerHosts = new ZkHosts(zkConnString, "/brokers");
>         SpoutConfig kafkaConfig = new SpoutConfig(brokerHosts, "twitter-topic","/twitter","twitter-topic-id");
>         kafkaConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
>         //kafkaConfig.forceStartOffsetTime(-2);
>         TopologyBuilder builder = new TopologyBuilder();
>         builder.setSpout("words", new KafkaSpout(kafkaConfig), 10);
>         builder.setBolt("print", new PrinterBolt()).shuffleGrouping("words");
>         LocalCluster cluster = new LocalCluster("localhost", new Long(2181));
>         Config config = new Config();
>         config.setDebug(true);
>         // config.put("storm.zookeeper.servers", "localhost");
>         // config.put("storm.zookeeper.port", "2181"); 
>         cluster.submitTopology("kafka-test", config, builder.createTopology());
>         Thread.sleep(600000);
>     }
> Judging by the output it seems that there is a problem with connecting to the Kafka partitions.
> I have tried many different things to get it to work but no luck. I have also been looking at using the KafkaSpoutTestTopology class but it is expecting arguments including 'dockerIp' which i don't understand.
> Should i be using Storm in localmode?
> Should i be using the TestTopologyStaticHosts class or would the KafkaSpoutTestTopology class be better?
> Any help at all would be greatly appreciated because i am really stuck.
> Kind Regards
> David Kavanagh
>
>
> 		 	   		  

Re: Storm KafkaSpout Integration

Posted by Spico Florin <sp...@gmail.com>.
hi,
i think the problem that you have is that you have stup one partition per
topic, but you try to conume with 10 kafka task spouts.
check this lines builder.setSpout("words", new KafkaSpout(kafkaConfig), 10);
10 represents the task parslellism for the spout, that shoul be in the case
of kafka the same number as the partition you have  setup for kafka topic.
you use more than one kafka partition when you would like to consume in
parallel the data from the topic. please check the very good documentation
on ksfka partition on confluent site.
in my opinon, set up your hint parallelism to 1 would solve the problem.
tne max spout pending has a different meaning.
regards,
florin

On Wednesday, March 30, 2016, david kavanagh <da...@hotmail.com> wrote:
> I am only creating one partition in code here:
>  GlobalPartitionInformation hostsAndPartitions = new
GlobalPartitionInformation();
>  hostsAndPartitions.addPartition(0, new Broker("127.0.0.1", 9092));
>  BrokerHosts brokerHosts = new StaticHosts(hostsAndPartitions);
> I hope that answered your question. I am new to both Storm and Kafka so i
am not sure exactly how it works.
> If i am understanding you correctly, the line you told me to add in the
first email should work because i am only creating one partition?
> config.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 1);
> Thanks again for the help :-)
> David
> ________________________________
> Date: Wed, 30 Mar 2016 15:36:19 +0530
> Subject: Re: Storm KafkaSpout Integration
> From: dkiralam@aadhya-analytics.com
> To: user@storm.apache.org
>
>
> Hi david,
>
> Can I know how many partitions you are having?
> statement I have given to you is default.if you are  running with no of
partitions make sure you give same number eg: if you are running with two
partitions change the number to 2 in the statement .
> config.put(Config.TOPOLOGY_MAX_SPOUT_PENDING,2 );
>
> Best regards,
> K.Sai Dilip Reddy.
> On Wed, Mar 30, 2016 at 3:00 PM, david kavanagh <da...@hotmail.com>
wrote:
>
> Thanks for the reply!
> I added the line as you suggested but there is still no difference
unfortunately.
> I am just guessing at this stage but judging by the output below it, it
seems like it is something to do with the partitioning or the offset.
> The warnings start by staying that  there are more tasks than partitions.
> Task 1 is assigned the partition that is created in the code (highlighted
in green), then the rest of the tasks are not assigned any partitions.
> Eventually is states 'Read partition information from:
/twitter/twitter-topic-id/partition_0  --> null'
> So it seems like it is not reading data from Kafka at all. I really don't
understand what is going on here.
> Any ideas?
>
> Kind Regards
> David
> --------------------------------------------------
> Storm Output:
> Thread-9-print] INFO  backtype.storm.daemon.executor - Prepared bolt
print:(2)
> 32644 [Thread-11-words] INFO
 org.apache.curator.framework.imps.CuratorFrameworkImpl - Starting
> 32685 [Thread-19-words] WARN  storm.kafka.KafkaUtils - there are more
tasks than partitions (tasks: 10; partitions: 1), some tasks will be idle
> 32686 [Thread-19-words] WARN  storm.kafka.KafkaUtils - Task [5/10] no
partitions assigned
> 32686 [Thread-17-words] WARN  storm.kafka.KafkaUtils - there are more
tasks than partitions (tasks: 10; partitions: 1), some tasks will be idle
> 32686 [Thread-15-words] WARN  storm.kafka.KafkaUtils - there are more
tasks than partitions (tasks: 10; partitions: 1), some tasks will be idle
> 32686 [Thread-17-words] WARN  storm.kafka.KafkaUtils - Task [4/10] no
partitions assigned
> 32686 [Thread-15-words] WARN  storm.kafka.KafkaUtils - Task [3/10] no
partitions assigned
> 32686 [Thread-11-words] WARN  storm.kafka.KafkaUtils - there are more
tasks than partitions (tasks: 10; partitions: 1), some tasks will be idle
> 32686 [Thread-11-words] INFO  storm.kafka.KafkaUtils - Task [1/10]
assigned [Partition{host=127.0.0.1:9092, partition=0}]
> 32687 [Thread-29-words] WARN  storm.kafka.KafkaUtils - there are more
tasks than partitions (tasks: 10; partitions: 1), some tasks will be idle
> 32697 [Thread-19-words-EventThread] INFO
 org.apache.curator.framework.state.ConnectionStateManager - State change:
CONNECTED
> 32697 [Thread-25-words-EventThread] INFO
 org.apache.curator.framework.state.ConnectionStateManager - State change:
CONNECTED
> 32697 [Thread-29-words-EventThread] INFO
 org.apache.curator.framework.state.ConnectionStateManager - State change:
CONNECTED
> 32697 [Thread-13-words-EventThread] INFO
 org.apache.curator.framework.state.ConnectionStateManager - State change:
CONNECTED
> 32697 [Thread-27-words-EventThread] INFO
 org.apache.curator.framework.state.ConnectionStateManager - State change:
CONNECTED
> 32697 [Thread-15-words-EventThread] INFO
 org.apache.curator.framework.state.ConnectionStateManager - State change:
CONNECTED
> 32689 [Thread-19-words] INFO  backtype.storm.daemon.executor - Opened
spout words:(7)
> 32689 [Thread-25-words] WARN  storm.kafka.KafkaUtils - there are more
tasks than partitions (tasks: 10; partitions: 1), some tasks will be idle
> 32689 [Thread-15-words] INFO  backtype.storm.daemon.executor - Opened
spout words:(5)
> 32738 [Thread-25-words] WARN  storm.kafka.KafkaUtils - Task [8/10] no
partitions assigned
> 32738 [Thread-25-words] INFO  backtype.storm.daemon.executor - Opened
spout words:(10)
> 32689 [Thread-17-words] INFO  backtype.storm.daemon.executor - Opened
spout words:(6)
> 32688 [Thread-13-words] WARN  storm.kafka.KafkaUtils - there are more
tasks than partitions (tasks: 10; partitions: 1), some tasks will be idle
> 32688 [Thread-21-words] WARN  storm.kafka.KafkaUtils - there are more
tasks than partitions (tasks: 10; partitions: 1), some tasks will be idle
> 32739 [Thread-21-words] WARN  storm.kafka.KafkaUtils - Task [6/10] no
partitions assigned
> 32739 [Thread-21-words] INFO  backtype.storm.daemon.executor - Opened
spout words:(8)
> 32687 [Thread-27-words] WARN  storm.kafka.KafkaUtils - there are more
tasks than partitions (tasks: 10; partitions: 1), some tasks will be idle
> 32740 [Thread-27-words] WARN  storm.kafka.KafkaUtils - Task [9/10] no
partitions assigned
> 32740 [Thread-27-words] INFO  backtype.storm.daemon.executor - Opened
spout words:(11)
> 32687 [Thread-23-words] WARN  storm.kafka.KafkaUtils - there are more
tasks than partitions (tasks: 10; partitions: 1), some tasks will be idle
> 32740 [Thread-23-words] WARN  storm.kafka.KafkaUtils - Task [7/10] no
partitions assigned
> 32736 [Thread-29-words] WARN  storm.kafka.KafkaUtils - Task [10/10] no
partitions assigned
> 32742 [Thread-17-words] INFO  backtype.storm.daemon.executor - Activating
spout words:(6)
> 32872 [Thread-29-words] INFO  backtype.storm.daemon.executor - Opened
spout words:(12)
> 32742 [Thread-25-words] INFO  backtype.storm.daemon.executor - Activating
spout words:(10)
> 32742 [Thread-21-words] INFO  backtype.storm.daemon.executor - Activating
spout words:(8)
> 32742 [Thread-27-words] INFO  backtype.storm.daemon.executor - Activating
spout words:(11)
> 32742 [Thread-19-words] INFO  backtype.storm.daemon.executor - Activating
spout words:(7)
> 32741 [Thread-15-words] INFO  backtype.storm.daemon.executor - Activating
spout words:(5)
> 32740 [Thread-13-words] WARN  storm.kafka.KafkaUtils - Task [2/10] no
partitions assigned
> 32873 [Thread-29-words] INFO  backtype.storm.daemon.executor - Activating
spout words:(12)
> 32872 [Thread-23-words] INFO  backtype.storm.daemon.executor - Opened
spout words:(9)
> 32873 [Thread-13-words] INFO  backtype.storm.daemon.executor - Opened
spout words:(4)
> 32873 [Thread-23-words] INFO  backtype.storm.daemon.executor - Activating
spout words:(9)
> 32873 [Thread-13-words] INFO  backtype.storm.daemon.executor - Activating
spout words:(4)
> 37756 [Thread-23-words-EventThread] INFO
 org.apache.curator.framework.state.ConnectionStateManager - State change:
CONNECTED
> 37757 [Thread-17-words-EventThread] INFO
 org.apache.curator.framework.state.ConnectionStateManager - State change:
CONNECTED
> 37757 [Thread-21-words-EventThread] INFO
 org.apache.curator.framework.state.ConnectionStateManager - State change:
CONNECTED
> 37757 [Thread-11-words-EventThread] INFO
 org.apache.curator.framework.state.ConnectionStateManager - State change:
CONNECTED
> 37773 [Thread-11-words] INFO  storm.kafka.PartitionManager - Read
partition information from: /twitter/twitter-topic-id/partition_0  --> null
> 37915 [Thread-11-words] INFO  storm.kafka.PartitionManager - No partition
information found, using configuration to determine offset
> 37915 [Thread-11-words] INFO  storm.kafka.PartitionManager - Starting
Kafka 127.0.0.1:0 from offset 185
> 37916 [Thread-11-words] INFO  backtype.storm.daemon.executor - Opened
spout words:(3)
> 37917 [Thread-11-words] INFO  backtype.storm.daemon.executor - Activating
spout words:(3)
> 62005 [Thread-11-words] INFO  backtype.storm.daemon.executor - Processing
received message source: __system:-1, stream: __tick, id: {}, [30]
> 62013 [Thread-13-words] INFO  backtype.storm.daemon.executor - Processing
received message source: __system:-1, stream: __tick, id: {}, [30]
>
> ________________________________
> Date: Wed, 30 Mar 2016 10:10:54 +0530
> Subject: Re: Storm KafkaSpout Integration
> From: dkiralam@aadhya-analytics.com
> To: user@storm.apache.org
>
> Hi david,
>
> I think everything is good but you are missing a statement
> config.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 1); add it after the line
config.setDebug(true);
>
> Best regards,
> K.Sai Dilip Reddy.
> On Tue, Mar 29, 2016 at 10:03 PM, david kavanagh <da...@hotmail.com>
wrote:
>
> Hi all,
> I am currently trying use TestTopologyStaticHosts to try connect the
KafkaSpout to a Kafka topic. I have a ZooKeeper and a Kafka instance
running on my localhost. I have a topic named "twitter-topic" that has some
tweets in it. This is all working as expected. I can run the consumer in
the terminal and it returns the tweets. I want to use the KafkaSpout to
connect to the Kafka topic and pull the tweets into a topology. I have been
working on this a few days now and no success.
> So far i have learned that when Storm is run in local mode that it uses
an in memory zookeeper on port 2000, which would not allow it to connect to
the Kafka topic. I have tried to get around this using the following syntax
that i found online:
> LocalCluster cluster = new LocalCluster("localhost", new Long(2181));
> It is still not working but it seems to be connecting to Kafka as it
gives a 'closed socket connection' message when i cancel the operation
(after it does not work and hangs open). It also says in the storm output
that it is connected to localhost 2181 so it seems to be getting that far.
I have included the full output from Storm in a txt file attached.
> Here is the code i am using in the TestTopologyStaticHosts class:
>  public static void main(String[] args) throws Exception {
>         //String zkConnString = "localhost:2181";
>         GlobalPartitionInformation hostsAndPartitions = new
GlobalPartitionInformation();
>         hostsAndPartitions.addPartition(0, new Broker("127.0.0.1", 9092));
>         BrokerHosts brokerHosts = new StaticHosts(hostsAndPartitions);
>         // BrokerHosts brokerHosts = new ZkHosts(zkConnString,
"/brokers");
>         SpoutConfig kafkaConfig = new SpoutConfig(brokerHosts,
"twitter-topic","/twitter","twitter-topic-id");
>         kafkaConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
>         //kafkaConfig.forceStartOffsetTime(-2);
>         TopologyBuilder builder = new TopologyBuilder();
>         builder.setSpout("words", new KafkaSpout(kafkaConfig), 10);
>         builder.setBolt("print", new
PrinterBolt()).shuffleGrouping("words");
>         LocalCluster cluster = new LocalCluster("localhost", new
Long(2181));
>         Config config = new Config();
>         config.setDebug(true);
>         // config.put("storm.zookeeper.servers", "localhost");
>         // config.put("storm.zookeeper.port", "2181");
>         cluster.submitTopology("kafka-test", config,
builder.createTopology());
>         Thread.sleep(600000);
>     }
> Judging by the output it seems that there is a problem with connecting to
the Kafka partitions.
> I have tried many different things to get it to work but no luck. I have
also been looking at using the KafkaSpoutTestTopology class but it is
expecting arguments including 'dockerIp' which i don't understand.
> Should i be using Storm in localmode?
> Should i be using the TestTopologyStaticHosts class or would
the KafkaSpoutTestTopology class be better?
> Any help at all would be greatly appreciated because i am really stuck.
> Kind Regards
> David Kavanagh
>
>
>

RE: Storm KafkaSpout Integration

Posted by david kavanagh <da...@hotmail.com>.
I am only creating one partition in code here:
 GlobalPartitionInformation hostsAndPartitions = new GlobalPartitionInformation(); hostsAndPartitions.addPartition(0, new Broker("127.0.0.1", 9092)); BrokerHosts brokerHosts = new StaticHosts(hostsAndPartitions);
I hope that answered your question. I am new to both Storm and Kafka so i am not sure exactly how it works. 
If i am understanding you correctly, the line you told me to add in the first email should work because i am only creating one partition?
config.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 1);
Thanks again for the help :-)
David
Date: Wed, 30 Mar 2016 15:36:19 +0530
Subject: Re: Storm KafkaSpout Integration
From: dkiralam@aadhya-analytics.com
To: user@storm.apache.org


Hi david,

Can I know how many  partitions you are having?
statement I have given to you is default.if you are  running with no of  partitions make sure you give same number eg: if you are running with two  partitions change the number to 2 in the statement .
config.put(Config.TOPOLOGY_MAX_SPOUT_PENDING,2 ); Best regards,K.Sai Dilip Reddy.

On Wed, Mar 30, 2016 at 3:00 PM, david kavanagh <da...@hotmail.com> wrote:



Thanks for the reply!
I added the line as you suggested but there is still no difference unfortunately. I am just guessing at this stage but judging by the output below it, it seems like it is something to do with the partitioning or the offset.The warnings start by staying that  there are more tasks than partitions. Task 1 is assigned the partition that is created in the code (highlighted in green), then the rest of the tasks are not assigned any partitions.Eventually is states 'Read partition information from: /twitter/twitter-topic-id/partition_0  --> null'
So it seems like it is not reading data from Kafka at all. I really don't understand what is going on here.Any ideas?

Kind Regards
David
--------------------------------------------------
Storm Output:
Thread-9-print] INFO  backtype.storm.daemon.executor - Prepared bolt print:(2)32644 [Thread-11-words] INFO  org.apache.curator.framework.imps.CuratorFrameworkImpl - Starting32685 [Thread-19-words] WARN  storm.kafka.KafkaUtils - there are more tasks than partitions (tasks: 10; partitions: 1), some tasks will be idle32686 [Thread-19-words] WARN  storm.kafka.KafkaUtils - Task [5/10] no partitions assigned32686 [Thread-17-words] WARN  storm.kafka.KafkaUtils - there are more tasks than partitions (tasks: 10; partitions: 1), some tasks will be idle32686 [Thread-15-words] WARN  storm.kafka.KafkaUtils - there are more tasks than partitions (tasks: 10; partitions: 1), some tasks will be idle32686 [Thread-17-words] WARN  storm.kafka.KafkaUtils - Task [4/10] no partitions assigned32686 [Thread-15-words] WARN  storm.kafka.KafkaUtils - Task [3/10] no partitions assigned32686 [Thread-11-words] WARN  storm.kafka.KafkaUtils - there are more tasks than partitions (tasks: 10; partitions: 1), some tasks will be idle32686 [Thread-11-words] INFO  storm.kafka.KafkaUtils - Task [1/10] assigned [Partition{host=127.0.0.1:9092, partition=0}]32687 [Thread-29-words] WARN  storm.kafka.KafkaUtils - there are more tasks than partitions (tasks: 10; partitions: 1), some tasks will be idle32697 [Thread-19-words-EventThread] INFO  org.apache.curator.framework.state.ConnectionStateManager - State change: CONNECTED32697 [Thread-25-words-EventThread] INFO  org.apache.curator.framework.state.ConnectionStateManager - State change: CONNECTED32697 [Thread-29-words-EventThread] INFO  org.apache.curator.framework.state.ConnectionStateManager - State change: CONNECTED32697 [Thread-13-words-EventThread] INFO  org.apache.curator.framework.state.ConnectionStateManager - State change: CONNECTED32697 [Thread-27-words-EventThread] INFO  org.apache.curator.framework.state.ConnectionStateManager - State change: CONNECTED32697 [Thread-15-words-EventThread] INFO  org.apache.curator.framework.state.ConnectionStateManager - State change: CONNECTED32689 [Thread-19-words] INFO  backtype.storm.daemon.executor - Opened spout words:(7)32689 [Thread-25-words] WARN  storm.kafka.KafkaUtils - there are more tasks than partitions (tasks: 10; partitions: 1), some tasks will be idle32689 [Thread-15-words] INFO  backtype.storm.daemon.executor - Opened spout words:(5)32738 [Thread-25-words] WARN  storm.kafka.KafkaUtils - Task [8/10] no partitions assigned32738 [Thread-25-words] INFO  backtype.storm.daemon.executor - Opened spout words:(10)32689 [Thread-17-words] INFO  backtype.storm.daemon.executor - Opened spout words:(6)32688 [Thread-13-words] WARN  storm.kafka.KafkaUtils - there are more tasks than partitions (tasks: 10; partitions: 1), some tasks will be idle32688 [Thread-21-words] WARN  storm.kafka.KafkaUtils - there are more tasks than partitions (tasks: 10; partitions: 1), some tasks will be idle32739 [Thread-21-words] WARN  storm.kafka.KafkaUtils - Task [6/10] no partitions assigned32739 [Thread-21-words] INFO  backtype.storm.daemon.executor - Opened spout words:(8)32687 [Thread-27-words] WARN  storm.kafka.KafkaUtils - there are more tasks than partitions (tasks: 10; partitions: 1), some tasks will be idle32740 [Thread-27-words] WARN  storm.kafka.KafkaUtils - Task [9/10] no partitions assigned32740 [Thread-27-words] INFO  backtype.storm.daemon.executor - Opened spout words:(11)32687 [Thread-23-words] WARN  storm.kafka.KafkaUtils - there are more tasks than partitions (tasks: 10; partitions: 1), some tasks will be idle32740 [Thread-23-words] WARN  storm.kafka.KafkaUtils - Task [7/10] no partitions assigned32736 [Thread-29-words] WARN  storm.kafka.KafkaUtils - Task [10/10] no partitions assigned32742 [Thread-17-words] INFO  backtype.storm.daemon.executor - Activating spout words:(6)32872 [Thread-29-words] INFO  backtype.storm.daemon.executor - Opened spout words:(12)32742 [Thread-25-words] INFO  backtype.storm.daemon.executor - Activating spout words:(10)32742 [Thread-21-words] INFO  backtype.storm.daemon.executor - Activating spout words:(8)32742 [Thread-27-words] INFO  backtype.storm.daemon.executor - Activating spout words:(11)32742 [Thread-19-words] INFO  backtype.storm.daemon.executor - Activating spout words:(7)32741 [Thread-15-words] INFO  backtype.storm.daemon.executor - Activating spout words:(5)32740 [Thread-13-words] WARN  storm.kafka.KafkaUtils - Task [2/10] no partitions assigned32873 [Thread-29-words] INFO  backtype.storm.daemon.executor - Activating spout words:(12)32872 [Thread-23-words] INFO  backtype.storm.daemon.executor - Opened spout words:(9)32873 [Thread-13-words] INFO  backtype.storm.daemon.executor - Opened spout words:(4)32873 [Thread-23-words] INFO  backtype.storm.daemon.executor - Activating spout words:(9)32873 [Thread-13-words] INFO  backtype.storm.daemon.executor - Activating spout words:(4)37756 [Thread-23-words-EventThread] INFO  org.apache.curator.framework.state.ConnectionStateManager - State change: CONNECTED37757 [Thread-17-words-EventThread] INFO  org.apache.curator.framework.state.ConnectionStateManager - State change: CONNECTED37757 [Thread-21-words-EventThread] INFO  org.apache.curator.framework.state.ConnectionStateManager - State change: CONNECTED37757 [Thread-11-words-EventThread] INFO  org.apache.curator.framework.state.ConnectionStateManager - State change: CONNECTED37773 [Thread-11-words] INFO  storm.kafka.PartitionManager - Read partition information from: /twitter/twitter-topic-id/partition_0  --> null37915 [Thread-11-words] INFO  storm.kafka.PartitionManager - No partition information found, using configuration to determine offset37915 [Thread-11-words] INFO  storm.kafka.PartitionManager - Starting Kafka 127.0.0.1:0 from offset 18537916 [Thread-11-words] INFO  backtype.storm.daemon.executor - Opened spout words:(3)37917 [Thread-11-words] INFO  backtype.storm.daemon.executor - Activating spout words:(3)62005 [Thread-11-words] INFO  backtype.storm.daemon.executor - Processing received message source: __system:-1, stream: __tick, id: {}, [30]62013 [Thread-13-words] INFO  backtype.storm.daemon.executor - Processing received message source: __system:-1, stream: __tick, id: {}, [30]

Date: Wed, 30 Mar 2016 10:10:54 +0530
Subject: Re: Storm KafkaSpout Integration
From: dkiralam@aadhya-analytics.com
To: user@storm.apache.org

Hi david,

I think everything is good but you are missing a statement 
config.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 1); add it after the line  config.setDebug(true); Best regards,K.Sai Dilip Reddy.

On Tue, Mar 29, 2016 at 10:03 PM, david kavanagh <da...@hotmail.com> wrote:



Hi all,
I am currently trying use TestTopologyStaticHosts to try connect the KafkaSpout to a Kafka topic. I have a ZooKeeper and a Kafka instance running on my localhost. I have a topic named "twitter-topic" that has some tweets in it. This is all working as expected. I can run the consumer in the terminal and it returns the tweets. I want to use the KafkaSpout to connect to the Kafka topic and pull the tweets into a topology. I have been working on this a few days now and no success.
So far i have learned that when Storm is run in local mode that it uses an in memory zookeeper on port 2000, which would not allow it to connect to the Kafka topic. I have tried to get around this using the following syntax that i found online:
LocalCluster cluster = new LocalCluster("localhost", new Long(2181));
It is still not working but it seems to be connecting to Kafka as it gives a 'closed socket connection' message when i cancel the operation (after it does not work and hangs open). It also says in the storm output that it is connected to localhost 2181 so it seems to be getting that far. I have included the full output from Storm in a txt file attached.
Here is the code i am using in the TestTopologyStaticHosts class:
 public static void main(String[] args) throws Exception {
        //String zkConnString = "localhost:2181";
        GlobalPartitionInformation hostsAndPartitions = new GlobalPartitionInformation();        hostsAndPartitions.addPartition(0, new Broker("127.0.0.1", 9092));        BrokerHosts brokerHosts = new StaticHosts(hostsAndPartitions);        // BrokerHosts brokerHosts = new ZkHosts(zkConnString, "/brokers");
        SpoutConfig kafkaConfig = new SpoutConfig(brokerHosts, "twitter-topic","/twitter","twitter-topic-id");        kafkaConfig.scheme = new SchemeAsMultiScheme(new StringScheme());        //kafkaConfig.forceStartOffsetTime(-2);
        TopologyBuilder builder = new TopologyBuilder();        builder.setSpout("words", new KafkaSpout(kafkaConfig), 10);        builder.setBolt("print", new PrinterBolt()).shuffleGrouping("words");        LocalCluster cluster = new LocalCluster("localhost", new Long(2181));        Config config = new Config();        config.setDebug(true);        // config.put("storm.zookeeper.servers", "localhost");        // config.put("storm.zookeeper.port", "2181");         cluster.submitTopology("kafka-test", config, builder.createTopology());
        Thread.sleep(600000);
    }
Judging by the output it seems that there is a problem with connecting to the Kafka partitions.I have tried many different things to get it to work but no luck. I have also been looking at using the KafkaSpoutTestTopology class but it is expecting arguments including 'dockerIp' which i don't understand.
Should i be using Storm in localmode?Should i be using the TestTopologyStaticHosts class or would the KafkaSpoutTestTopology class be better?
Any help at all would be greatly appreciated because i am really stuck.
Kind RegardsDavid Kavanagh
 		 	   		  

 		 	   		  

 		 	   		  

Re: Storm KafkaSpout Integration

Posted by Sai Dilip Reddy Kiralam <dk...@aadhya-analytics.com>.
Hi david,

Can I know how many partitions you are having?
statement I have given to you is default.if you are  running with no of
partitions make sure you give same number eg: if you are running with two
partitions change the number to 2 in the statement .
config.put(Config.TOPOLOGY_MAX_SPOUT_PENDING,2 );


*Best regards,*
*K.Sai Dilip Reddy.*

On Wed, Mar 30, 2016 at 3:00 PM, david kavanagh <da...@hotmail.com>
wrote:

> Thanks for the reply!
>
> I added the line as you suggested but there is still no difference
> unfortunately.
> I am just guessing at this stage but judging by the output below it, it
> seems like it is something to do with the partitioning or the offset.
> The warnings start by staying that  there are more tasks than partitions.
> Task 1 is assigned the partition that is created in the code (highlighted
> in green), then the rest of the tasks are not assigned any partitions.
> Eventually is states 'Read partition information from:
> /twitter/twitter-topic-id/partition_0  --> null'
>
> So it seems like it is not reading data from Kafka at all. I really don't
> understand what is going on here.
> Any ideas?
>
>
> Kind Regards
>
> David
>
> --------------------------------------------------
>
> *Storm Output:*
>
> Thread-9-print] INFO  backtype.storm.daemon.executor - Prepared bolt
> print:(2)
> 32644 [Thread-11-words] INFO
>  org.apache.curator.framework.imps.CuratorFrameworkImpl - Starting
> 32685 [Thread-19-words] WARN  storm.kafka.KafkaUtils - there are more
> tasks than partitions (tasks: 10; partitions: 1), some tasks will be idle
> 32686 [Thread-19-words] WARN  storm.kafka.KafkaUtils - Task [5/10] no
> partitions assigned
> 32686 [Thread-17-words] WARN  storm.kafka.KafkaUtils - there are more
> tasks than partitions (tasks: 10; partitions: 1), some tasks will be idle
> 32686 [Thread-15-words] WARN  storm.kafka.KafkaUtils - there are more
> tasks than partitions (tasks: 10; partitions: 1), some tasks will be idle
> 32686 [Thread-17-words] WARN  storm.kafka.KafkaUtils - Task [4/10] no
> partitions assigned
> 32686 [Thread-15-words] WARN  storm.kafka.KafkaUtils - Task [3/10] no
> partitions assigned
> 32686 [Thread-11-words] WARN  storm.kafka.KafkaUtils - there are more
> tasks than partitions (tasks: 10; partitions: 1), some tasks will be idle
> 32686 [Thread-11-words] INFO  storm.kafka.KafkaUtils - Task [1/10]
> assigned [Partition{host=127.0.0.1:9092, partition=0}]
> 32687 [Thread-29-words] WARN  storm.kafka.KafkaUtils - there are more
> tasks than partitions (tasks: 10; partitions: 1), some tasks will be idle
> 32697 [Thread-19-words-EventThread] INFO
>  org.apache.curator.framework.state.ConnectionStateManager - State change:
> CONNECTED
> 32697 [Thread-25-words-EventThread] INFO
>  org.apache.curator.framework.state.ConnectionStateManager - State change:
> CONNECTED
> 32697 [Thread-29-words-EventThread] INFO
>  org.apache.curator.framework.state.ConnectionStateManager - State change:
> CONNECTED
> 32697 [Thread-13-words-EventThread] INFO
>  org.apache.curator.framework.state.ConnectionStateManager - State change:
> CONNECTED
> 32697 [Thread-27-words-EventThread] INFO
>  org.apache.curator.framework.state.ConnectionStateManager - State change:
> CONNECTED
> 32697 [Thread-15-words-EventThread] INFO
>  org.apache.curator.framework.state.ConnectionStateManager - State change:
> CONNECTED
> 32689 [Thread-19-words] INFO  backtype.storm.daemon.executor - Opened
> spout words:(7)
> 32689 [Thread-25-words] WARN  storm.kafka.KafkaUtils - there are more
> tasks than partitions (tasks: 10; partitions: 1), some tasks will be idle
> 32689 [Thread-15-words] INFO  backtype.storm.daemon.executor - Opened
> spout words:(5)
> 32738 [Thread-25-words] WARN  storm.kafka.KafkaUtils - Task [8/10] no
> partitions assigned
> 32738 [Thread-25-words] INFO  backtype.storm.daemon.executor - Opened
> spout words:(10)
> 32689 [Thread-17-words] INFO  backtype.storm.daemon.executor - Opened
> spout words:(6)
> 32688 [Thread-13-words] WARN  storm.kafka.KafkaUtils - there are more
> tasks than partitions (tasks: 10; partitions: 1), some tasks will be idle
> 32688 [Thread-21-words] WARN  storm.kafka.KafkaUtils - there are more
> tasks than partitions (tasks: 10; partitions: 1), some tasks will be idle
> 32739 [Thread-21-words] WARN  storm.kafka.KafkaUtils - Task [6/10] no
> partitions assigned
> 32739 [Thread-21-words] INFO  backtype.storm.daemon.executor - Opened
> spout words:(8)
> 32687 [Thread-27-words] WARN  storm.kafka.KafkaUtils - there are more
> tasks than partitions (tasks: 10; partitions: 1), some tasks will be idle
> 32740 [Thread-27-words] WARN  storm.kafka.KafkaUtils - Task [9/10] no
> partitions assigned
> 32740 [Thread-27-words] INFO  backtype.storm.daemon.executor - Opened
> spout words:(11)
> 32687 [Thread-23-words] WARN  storm.kafka.KafkaUtils - there are more
> tasks than partitions (tasks: 10; partitions: 1), some tasks will be idle
> 32740 [Thread-23-words] WARN  storm.kafka.KafkaUtils - Task [7/10] no
> partitions assigned
> 32736 [Thread-29-words] WARN  storm.kafka.KafkaUtils - Task [10/10] no
> partitions assigned
> 32742 [Thread-17-words] INFO  backtype.storm.daemon.executor - Activating
> spout words:(6)
> 32872 [Thread-29-words] INFO  backtype.storm.daemon.executor - Opened
> spout words:(12)
> 32742 [Thread-25-words] INFO  backtype.storm.daemon.executor - Activating
> spout words:(10)
> 32742 [Thread-21-words] INFO  backtype.storm.daemon.executor - Activating
> spout words:(8)
> 32742 [Thread-27-words] INFO  backtype.storm.daemon.executor - Activating
> spout words:(11)
> 32742 [Thread-19-words] INFO  backtype.storm.daemon.executor - Activating
> spout words:(7)
> 32741 [Thread-15-words] INFO  backtype.storm.daemon.executor - Activating
> spout words:(5)
> 32740 [Thread-13-words] WARN  storm.kafka.KafkaUtils - Task [2/10] no
> partitions assigned
> 32873 [Thread-29-words] INFO  backtype.storm.daemon.executor - Activating
> spout words:(12)
> 32872 [Thread-23-words] INFO  backtype.storm.daemon.executor - Opened
> spout words:(9)
> 32873 [Thread-13-words] INFO  backtype.storm.daemon.executor - Opened
> spout words:(4)
> 32873 [Thread-23-words] INFO  backtype.storm.daemon.executor - Activating
> spout words:(9)
> 32873 [Thread-13-words] INFO  backtype.storm.daemon.executor - Activating
> spout words:(4)
> 37756 [Thread-23-words-EventThread] INFO
>  org.apache.curator.framework.state.ConnectionStateManager - State change:
> CONNECTED
> 37757 [Thread-17-words-EventThread] INFO
>  org.apache.curator.framework.state.ConnectionStateManager - State change:
> CONNECTED
> 37757 [Thread-21-words-EventThread] INFO
>  org.apache.curator.framework.state.ConnectionStateManager - State change:
> CONNECTED
> 37757 [Thread-11-words-EventThread] INFO
>  org.apache.curator.framework.state.ConnectionStateManager - State change:
> CONNECTED
> 37773 [Thread-11-words] INFO  storm.kafka.PartitionManager - Read
> partition information from: /twitter/twitter-topic-id/partition_0  --> null
> 37915 [Thread-11-words] INFO  storm.kafka.PartitionManager - No partition
> information found, using configuration to determine offset
> 37915 [Thread-11-words] INFO  storm.kafka.PartitionManager - Starting
> Kafka 127.0.0.1:0 from offset 185
> 37916 [Thread-11-words] INFO  backtype.storm.daemon.executor - Opened
> spout words:(3)
> 37917 [Thread-11-words] INFO  backtype.storm.daemon.executor - Activating
> spout words:(3)
> 62005 [Thread-11-words] INFO  backtype.storm.daemon.executor - Processing
> received message source: __system:-1, stream: __tick, id: {}, [30]
> 62013 [Thread-13-words] INFO  backtype.storm.daemon.executor - Processing
> received message source: __system:-1, stream: __tick, id: {}, [30]
>
>
> ------------------------------
> Date: Wed, 30 Mar 2016 10:10:54 +0530
> Subject: Re: Storm KafkaSpout Integration
> From: dkiralam@aadhya-analytics.com
> To: user@storm.apache.org
>
>
> Hi david,
>
> I think everything is good but you are missing a statement
> config.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 1); add it after the line
> config.setDebug(true);
>
> *Best regards,*
> *K.Sai Dilip Reddy.*
>
> On Tue, Mar 29, 2016 at 10:03 PM, david kavanagh <da...@hotmail.com>
> wrote:
>
> Hi all,
>
> I am currently trying use TestTopologyStaticHosts to try connect the
> KafkaSpout to a Kafka topic. I have a ZooKeeper and a Kafka instance
> running on my localhost. I have a topic named "twitter-topic" that has some
> tweets in it. This is all working as expected. I can run the consumer in
> the terminal and it returns the tweets. I want to use the KafkaSpout to
> connect to the Kafka topic and pull the tweets into a topology. I have
> been working on this a few days now and no success.
>
> So far i have learned that when Storm is run in local mode that it uses an
> in memory zookeeper on port 2000, which would not allow it to connect to
> the Kafka topic. I have tried to get around this using the following syntax
> that i found online:
>
> LocalCluster cluster = new LocalCluster("localhost", new Long(2181));
>
> It is still not working but it seems to be connecting to Kafka as it gives
> a 'closed socket connection' message when i cancel the operation (after it
> does not work and hangs open). It also says in the storm output that it is
> connected to localhost 2181 so it seems to be getting that far. I have
> included the full output from Storm in a txt file attached.
>
> Here is the code i am using in the TestTopologyStaticHosts class:
>
>  public static void main(String[] args) throws Exception {
>
>         //String zkConnString = "localhost:2181";
>
>         GlobalPartitionInformation hostsAndPartitions = new
> GlobalPartitionInformation();
>         hostsAndPartitions.addPartition(0, new Broker("127.0.0.1", 9092));
>         BrokerHosts brokerHosts = new StaticHosts(hostsAndPartitions);
>         // BrokerHosts brokerHosts = new ZkHosts(zkConnString, "/brokers");
>
>         SpoutConfig kafkaConfig = new SpoutConfig(brokerHosts,
> "twitter-topic","/twitter","twitter-topic-id");
>         kafkaConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
>         //kafkaConfig.forceStartOffsetTime(-2);
>
>         TopologyBuilder builder = new TopologyBuilder();
>         builder.setSpout("words", new KafkaSpout(kafkaConfig), 10);
>         builder.setBolt("print", new
> PrinterBolt()).shuffleGrouping("words");
>         LocalCluster cluster = new LocalCluster("localhost", new
> Long(2181));
>         Config config = new Config();
>         config.setDebug(true);
>         // config.put("storm.zookeeper.servers", "localhost");
>         // config.put("storm.zookeeper.port", "2181");
>         cluster.submitTopology("kafka-test", config,
> builder.createTopology());
>
>         Thread.sleep(600000);
>
>     }
>
> Judging by the output it seems that there is a problem with connecting to
> the Kafka partitions.
> I have tried many different things to get it to work but no luck. I have
> also been looking at using the KafkaSpoutTestTopology class but it is
> expecting arguments including 'dockerIp' which i don't understand.
>
> Should i be using Storm in localmode?
> Should i be using the TestTopologyStaticHosts class or would the KafkaSpoutTestTopology
> class be better?
>
> Any help at all would be greatly appreciated because i am really stuck.
>
> Kind Regards
> David Kavanagh
>
>
>

RE: Storm KafkaSpout Integration

Posted by david kavanagh <da...@hotmail.com>.
Thanks for the reply!
I added the line as you suggested but there is still no difference unfortunately. I am just guessing at this stage but judging by the output below it, it seems like it is something to do with the partitioning or the offset.The warnings start by staying that  there are more tasks than partitions. Task 1 is assigned the partition that is created in the code (highlighted in green), then the rest of the tasks are not assigned any partitions.Eventually is states 'Read partition information from: /twitter/twitter-topic-id/partition_0  --> null'
So it seems like it is not reading data from Kafka at all. I really don't understand what is going on here.Any ideas?

Kind Regards
David
--------------------------------------------------
Storm Output:
Thread-9-print] INFO  backtype.storm.daemon.executor - Prepared bolt print:(2)32644 [Thread-11-words] INFO  org.apache.curator.framework.imps.CuratorFrameworkImpl - Starting32685 [Thread-19-words] WARN  storm.kafka.KafkaUtils - there are more tasks than partitions (tasks: 10; partitions: 1), some tasks will be idle32686 [Thread-19-words] WARN  storm.kafka.KafkaUtils - Task [5/10] no partitions assigned32686 [Thread-17-words] WARN  storm.kafka.KafkaUtils - there are more tasks than partitions (tasks: 10; partitions: 1), some tasks will be idle32686 [Thread-15-words] WARN  storm.kafka.KafkaUtils - there are more tasks than partitions (tasks: 10; partitions: 1), some tasks will be idle32686 [Thread-17-words] WARN  storm.kafka.KafkaUtils - Task [4/10] no partitions assigned32686 [Thread-15-words] WARN  storm.kafka.KafkaUtils - Task [3/10] no partitions assigned32686 [Thread-11-words] WARN  storm.kafka.KafkaUtils - there are more tasks than partitions (tasks: 10; partitions: 1), some tasks will be idle32686 [Thread-11-words] INFO  storm.kafka.KafkaUtils - Task [1/10] assigned [Partition{host=127.0.0.1:9092, partition=0}]32687 [Thread-29-words] WARN  storm.kafka.KafkaUtils - there are more tasks than partitions (tasks: 10; partitions: 1), some tasks will be idle32697 [Thread-19-words-EventThread] INFO  org.apache.curator.framework.state.ConnectionStateManager - State change: CONNECTED32697 [Thread-25-words-EventThread] INFO  org.apache.curator.framework.state.ConnectionStateManager - State change: CONNECTED32697 [Thread-29-words-EventThread] INFO  org.apache.curator.framework.state.ConnectionStateManager - State change: CONNECTED32697 [Thread-13-words-EventThread] INFO  org.apache.curator.framework.state.ConnectionStateManager - State change: CONNECTED32697 [Thread-27-words-EventThread] INFO  org.apache.curator.framework.state.ConnectionStateManager - State change: CONNECTED32697 [Thread-15-words-EventThread] INFO  org.apache.curator.framework.state.ConnectionStateManager - State change: CONNECTED32689 [Thread-19-words] INFO  backtype.storm.daemon.executor - Opened spout words:(7)32689 [Thread-25-words] WARN  storm.kafka.KafkaUtils - there are more tasks than partitions (tasks: 10; partitions: 1), some tasks will be idle32689 [Thread-15-words] INFO  backtype.storm.daemon.executor - Opened spout words:(5)32738 [Thread-25-words] WARN  storm.kafka.KafkaUtils - Task [8/10] no partitions assigned32738 [Thread-25-words] INFO  backtype.storm.daemon.executor - Opened spout words:(10)32689 [Thread-17-words] INFO  backtype.storm.daemon.executor - Opened spout words:(6)32688 [Thread-13-words] WARN  storm.kafka.KafkaUtils - there are more tasks than partitions (tasks: 10; partitions: 1), some tasks will be idle32688 [Thread-21-words] WARN  storm.kafka.KafkaUtils - there are more tasks than partitions (tasks: 10; partitions: 1), some tasks will be idle32739 [Thread-21-words] WARN  storm.kafka.KafkaUtils - Task [6/10] no partitions assigned32739 [Thread-21-words] INFO  backtype.storm.daemon.executor - Opened spout words:(8)32687 [Thread-27-words] WARN  storm.kafka.KafkaUtils - there are more tasks than partitions (tasks: 10; partitions: 1), some tasks will be idle32740 [Thread-27-words] WARN  storm.kafka.KafkaUtils - Task [9/10] no partitions assigned32740 [Thread-27-words] INFO  backtype.storm.daemon.executor - Opened spout words:(11)32687 [Thread-23-words] WARN  storm.kafka.KafkaUtils - there are more tasks than partitions (tasks: 10; partitions: 1), some tasks will be idle32740 [Thread-23-words] WARN  storm.kafka.KafkaUtils - Task [7/10] no partitions assigned32736 [Thread-29-words] WARN  storm.kafka.KafkaUtils - Task [10/10] no partitions assigned32742 [Thread-17-words] INFO  backtype.storm.daemon.executor - Activating spout words:(6)32872 [Thread-29-words] INFO  backtype.storm.daemon.executor - Opened spout words:(12)32742 [Thread-25-words] INFO  backtype.storm.daemon.executor - Activating spout words:(10)32742 [Thread-21-words] INFO  backtype.storm.daemon.executor - Activating spout words:(8)32742 [Thread-27-words] INFO  backtype.storm.daemon.executor - Activating spout words:(11)32742 [Thread-19-words] INFO  backtype.storm.daemon.executor - Activating spout words:(7)32741 [Thread-15-words] INFO  backtype.storm.daemon.executor - Activating spout words:(5)32740 [Thread-13-words] WARN  storm.kafka.KafkaUtils - Task [2/10] no partitions assigned32873 [Thread-29-words] INFO  backtype.storm.daemon.executor - Activating spout words:(12)32872 [Thread-23-words] INFO  backtype.storm.daemon.executor - Opened spout words:(9)32873 [Thread-13-words] INFO  backtype.storm.daemon.executor - Opened spout words:(4)32873 [Thread-23-words] INFO  backtype.storm.daemon.executor - Activating spout words:(9)32873 [Thread-13-words] INFO  backtype.storm.daemon.executor - Activating spout words:(4)37756 [Thread-23-words-EventThread] INFO  org.apache.curator.framework.state.ConnectionStateManager - State change: CONNECTED37757 [Thread-17-words-EventThread] INFO  org.apache.curator.framework.state.ConnectionStateManager - State change: CONNECTED37757 [Thread-21-words-EventThread] INFO  org.apache.curator.framework.state.ConnectionStateManager - State change: CONNECTED37757 [Thread-11-words-EventThread] INFO  org.apache.curator.framework.state.ConnectionStateManager - State change: CONNECTED37773 [Thread-11-words] INFO  storm.kafka.PartitionManager - Read partition information from: /twitter/twitter-topic-id/partition_0  --> null37915 [Thread-11-words] INFO  storm.kafka.PartitionManager - No partition information found, using configuration to determine offset37915 [Thread-11-words] INFO  storm.kafka.PartitionManager - Starting Kafka 127.0.0.1:0 from offset 18537916 [Thread-11-words] INFO  backtype.storm.daemon.executor - Opened spout words:(3)37917 [Thread-11-words] INFO  backtype.storm.daemon.executor - Activating spout words:(3)62005 [Thread-11-words] INFO  backtype.storm.daemon.executor - Processing received message source: __system:-1, stream: __tick, id: {}, [30]62013 [Thread-13-words] INFO  backtype.storm.daemon.executor - Processing received message source: __system:-1, stream: __tick, id: {}, [30]

Date: Wed, 30 Mar 2016 10:10:54 +0530
Subject: Re: Storm KafkaSpout Integration
From: dkiralam@aadhya-analytics.com
To: user@storm.apache.org

Hi david,

I think everything is good but you are missing a statement 
config.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 1); add it after the line  config.setDebug(true); Best regards,K.Sai Dilip Reddy.

On Tue, Mar 29, 2016 at 10:03 PM, david kavanagh <da...@hotmail.com> wrote:



Hi all,
I am currently trying use TestTopologyStaticHosts to try connect the KafkaSpout to a Kafka topic. I have a ZooKeeper and a Kafka instance running on my localhost. I have a topic named "twitter-topic" that has some tweets in it. This is all working as expected. I can run the consumer in the terminal and it returns the tweets. I want to use the KafkaSpout to connect to the Kafka topic and pull the tweets into a topology. I have been working on this a few days now and no success.
So far i have learned that when Storm is run in local mode that it uses an in memory zookeeper on port 2000, which would not allow it to connect to the Kafka topic. I have tried to get around this using the following syntax that i found online:
LocalCluster cluster = new LocalCluster("localhost", new Long(2181));
It is still not working but it seems to be connecting to Kafka as it gives a 'closed socket connection' message when i cancel the operation (after it does not work and hangs open). It also says in the storm output that it is connected to localhost 2181 so it seems to be getting that far. I have included the full output from Storm in a txt file attached.
Here is the code i am using in the TestTopologyStaticHosts class:
 public static void main(String[] args) throws Exception {
        //String zkConnString = "localhost:2181";
        GlobalPartitionInformation hostsAndPartitions = new GlobalPartitionInformation();        hostsAndPartitions.addPartition(0, new Broker("127.0.0.1", 9092));        BrokerHosts brokerHosts = new StaticHosts(hostsAndPartitions);        // BrokerHosts brokerHosts = new ZkHosts(zkConnString, "/brokers");
        SpoutConfig kafkaConfig = new SpoutConfig(brokerHosts, "twitter-topic","/twitter","twitter-topic-id");        kafkaConfig.scheme = new SchemeAsMultiScheme(new StringScheme());        //kafkaConfig.forceStartOffsetTime(-2);
        TopologyBuilder builder = new TopologyBuilder();        builder.setSpout("words", new KafkaSpout(kafkaConfig), 10);        builder.setBolt("print", new PrinterBolt()).shuffleGrouping("words");        LocalCluster cluster = new LocalCluster("localhost", new Long(2181));        Config config = new Config();        config.setDebug(true);        // config.put("storm.zookeeper.servers", "localhost");        // config.put("storm.zookeeper.port", "2181");         cluster.submitTopology("kafka-test", config, builder.createTopology());
        Thread.sleep(600000);
    }
Judging by the output it seems that there is a problem with connecting to the Kafka partitions.I have tried many different things to get it to work but no luck. I have also been looking at using the KafkaSpoutTestTopology class but it is expecting arguments including 'dockerIp' which i don't understand.
Should i be using Storm in localmode?Should i be using the TestTopologyStaticHosts class or would the KafkaSpoutTestTopology class be better?
Any help at all would be greatly appreciated because i am really stuck.
Kind RegardsDavid Kavanagh
 		 	   		  

 		 	   		  

Re: Storm KafkaSpout Integration

Posted by Sai Dilip Reddy Kiralam <dk...@aadhya-analytics.com>.
Hi david,

I think everything is good but you are missing a statement
config.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 1); add it after the line
config.setDebug(true);



*Best regards,*
*K.Sai Dilip Reddy.*

On Tue, Mar 29, 2016 at 10:03 PM, david kavanagh <da...@hotmail.com>
wrote:

> Hi all,
>
> I am currently trying use TestTopologyStaticHosts to try connect the
> KafkaSpout to a Kafka topic. I have a ZooKeeper and a Kafka instance
> running on my localhost. I have a topic named "twitter-topic" that has some
> tweets in it. This is all working as expected. I can run the consumer in
> the terminal and it returns the tweets. I want to use the KafkaSpout to
> connect to the Kafka topic and pull the tweets into a topology. I have
> been working on this a few days now and no success.
>
> So far i have learned that when Storm is run in local mode that it uses an
> in memory zookeeper on port 2000, which would not allow it to connect to
> the Kafka topic. I have tried to get around this using the following syntax
> that i found online:
>
> LocalCluster cluster = new LocalCluster("localhost", new Long(2181));
>
> It is still not working but it seems to be connecting to Kafka as it gives
> a 'closed socket connection' message when i cancel the operation (after it
> does not work and hangs open). It also says in the storm output that it is
> connected to localhost 2181 so it seems to be getting that far. I have
> included the full output from Storm in a txt file attached.
>
> Here is the code i am using in the TestTopologyStaticHosts class:
>
>  public static void main(String[] args) throws Exception {
>
>         //String zkConnString = "localhost:2181";
>
>         GlobalPartitionInformation hostsAndPartitions = new
> GlobalPartitionInformation();
>         hostsAndPartitions.addPartition(0, new Broker("127.0.0.1", 9092));
>         BrokerHosts brokerHosts = new StaticHosts(hostsAndPartitions);
>         // BrokerHosts brokerHosts = new ZkHosts(zkConnString, "/brokers");
>
>         SpoutConfig kafkaConfig = new SpoutConfig(brokerHosts,
> "twitter-topic","/twitter","twitter-topic-id");
>         kafkaConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
>         //kafkaConfig.forceStartOffsetTime(-2);
>
>         TopologyBuilder builder = new TopologyBuilder();
>         builder.setSpout("words", new KafkaSpout(kafkaConfig), 10);
>         builder.setBolt("print", new
> PrinterBolt()).shuffleGrouping("words");
>         LocalCluster cluster = new LocalCluster("localhost", new
> Long(2181));
>         Config config = new Config();
>         config.setDebug(true);
>         // config.put("storm.zookeeper.servers", "localhost");
>         // config.put("storm.zookeeper.port", "2181");
>         cluster.submitTopology("kafka-test", config,
> builder.createTopology());
>
>         Thread.sleep(600000);
>
>     }
>
> Judging by the output it seems that there is a problem with connecting to
> the Kafka partitions.
> I have tried many different things to get it to work but no luck. I have
> also been looking at using the KafkaSpoutTestTopology class but it is
> expecting arguments including 'dockerIp' which i don't understand.
>
> Should i be using Storm in localmode?
> Should i be using the TestTopologyStaticHosts class or would the KafkaSpoutTestTopology
> class be better?
>
> Any help at all would be greatly appreciated because i am really stuck.
>
> Kind Regards
> David Kavanagh
>
>