You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@storm.apache.org by Palak Shah <sp...@gmail.com> on 2014/07/30 08:17:01 UTC

Kafka Spout not reading from Topic

Hi,

I am using the Kafka spout that in integrated in
apache-storm-0.9.2-incubating release. I am able to submit the topology to
my storm cluster, but it is not receiving any tuples from the Kafka topic.
I know the topic ("page_visits") has data because I can read it from the
console.

Here is my code for topology :

public static void main(String[] args) throws AlreadyAliveException,
InvalidTopologyException {
        BrokerHosts zkHost = new ZkHosts("localhost:2181");
        SpoutConfig spoutConfig = new SpoutConfig(
                  zkHost, // list of Kafka brokers
                  "page_visits", // topic to read from
                  "/zkroot", // the root path in Zookeeper for the spout to
store the consumer offsets
                  "discovery"); // an id for this consumer for storing the
consumer offsets in Zookeeper
        spoutConfig.forceFromStart = true;
        KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);

        TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout("kafkaSpout", kafkaSpout);
        builder.setBolt("kafkaBolt", new
PrinterBolt()).shuffleGrouping("kafkaSpout");

        Config conf = new Config();
        conf.setNumWorkers(4);
        conf.setDebug(true);

        StormSubmitter.submitTopology(args[0], conf,
builder.createTopology());

    }

}

I am using apache-storm-0.9.2-incubating and kafka-0.9.2-0.8.1.1. Is this a
versions compatibility issue? if so, which version should I use for this to
work?

Thanks in Advance,
Palak Shah

Re: Kafka Spout not reading from Topic

Posted by Sa Li <sa...@gmail.com>.
Hi, all

I had similar issue, but I am using storm-kafka: https://github.com/apache/incubator-storm/tree/master/external/storm-kafka, it uses tridentTopology which is good to parse the message if the tuple is in the form of json. Here is my code: 

public static class PrintStream implements Filter {

	   		@SuppressWarnings("rawtypes")
	   		
			@Override
	   		public void prepare(Map conf, TridentOperationContext context) {
	   		}
	   		@Override
	   		public void cleanup() {
	   		}
            
	   		@Override
	   		public boolean isKeep(TridentTuple tuple) {
	   			System.out.println(tuple);
	   			return true;
	   		}
}
public static StormTopology buildTopology(LocalDRPC drpc) throws IOException {
	           
	           	TridentTopology topology = new TridentTopology();
	       		BrokerHosts zk = new ZkHosts("localhost");
	       		TridentKafkaConfig spoutConf = new TridentKafkaConfig(zk, "ingest_test");
	       		spoutConf.scheme = new SchemeAsMultiScheme(new StringScheme());
	       		OpaqueTridentKafkaSpout spout = new OpaqueTridentKafkaSpout(spoutConf);
	           	          
	            topology.newStream("kafka", spout)
                                      .each(new Fields("str"),
	                		 new PrintStream()                     
	                   );
	               
	               return topology.build();
	     
	     }
  
	     public static void main(String[] args) throws Exception {
	    	 
	 	    Config conf = new Config();
	 	    conf.setDebug(true);
                    conf.setMaxSpoutPending(1);
	            conf.setMaxTaskParallelism(3);
	            LocalDRPC drpc = new LocalDRPC();
	            LocalCluster cluster = new LocalCluster();
	       	    cluster.submitTopology("kafka", conf, buildTopology(drpc));	    
	       	    Thread.sleep(100);
	            cluster.shutdown();   
	     } 
	       
I was expecting to print the message generated by a kafka producer (I can use kafka-console-consumer to read), code runs but nothing display on screen:
[main] INFO com.netflix.curator.framework.imps.CuratorFrameworkImpl - Starting
[main-EventThread] INFO backtype.storm.zookeeper - Zookeeper state update: :connected:none
[main] INFO com.netflix.curator.framework.imps.CuratorFrameworkImpl - Starting
[main] INFO com.netflix.curator.framework.imps.CuratorFrameworkImpl - Starting
[main-EventThread] INFO backtype.storm.zookeeper - Zookeeper state update: :connected:none
[main] INFO com.netflix.curator.framework.imps.CuratorFrameworkImpl - Starting
[main] INFO com.netflix.curator.framework.imps.CuratorFrameworkImpl - Starting
[main-EventThread] INFO backtype.storm.zookeeper - Zookeeper state update: :connected:none
[main] INFO com.netflix.curator.framework.imps.CuratorFrameworkImpl - Starting
[main] INFO backtype.storm.daemon.supervisor - Starting Supervisor with conf {"dev.zookeeper.path" "/tmp/dev-storm-zookeeper", "topology.tick.tuple.freq.secs" nil, "topology.builtin.metrics.bucket.size.secs" 60, "topology.fall.back.on.java.serialization" true, "topology.max.error.report.per.interval" 5, "zmq.linger.millis" 0, "topology.skip.missing.kryo.registrations" true, "storm.messaging.netty.client_worker_threads" 1, "ui.childopts" "-Xmx768m", "storm.zookeeper.session.timeout" 20000, "nimbus.reassign" true, "topology.trident.batch.emit.interval.millis" 50, "nimbus.monitor.freq.secs" 10, "logviewer.childopts" "-Xmx128m", "java.library.path" "/usr/local/lib:/opt/local/lib:/usr/lib", "topology.executor.send.buffer.size" 1024, "storm.local.dir" "/var/folders/91/j38ypw6105z8j5xrhq2t_ng00000gn/T//724c7721-297a-42aa-87b1-7c3bc8a4dd24", "storm.messaging.netty.buffer_size" 5242880, "supervisor.worker.start.timeout.secs" 120, "topology.enable.message.timeouts" true, "nimbus.cleanup.inbox.freq.secs" 600, "nimbus.inbox.jar.expiration.secs" 3600, "drpc.worker.threads" 64, "topology.worker.shared.thread.pool.size" 4, "nimbus.host" "localhost", "storm.messaging.netty.min_wait_ms" 100, "storm.zookeeper.port" 2000, "transactional.zookeeper.port" nil, "topology.executor.receive.buffer.size" 1024, "transactional.zookeeper.servers" nil, "storm.zookeeper.root" "/storm", "storm.zookeeper.retry.intervalceiling.millis" 30000, "supervisor.enable" true, "storm.messaging.netty.server_worker_threads" 1, "storm.zookeeper.servers" ["localhost"], "transactional.zookeeper.root" "/transactional", "topology.acker.executors" nil, "topology.transfer.buffer.size" 1024, "topology.worker.childopts" nil, "drpc.queue.size" 128, "worker.childopts" "-Xmx768m", "supervisor.heartbeat.frequency.secs" 5, "topology.error.throttle.interval.secs" 10, "zmq.hwm" 0, "drpc.port" 3772, "supervisor.monitor.frequency.secs" 3, "drpc.childopts" "-Xmx768m", "topology.receiver.buffer.size" 8, "task.heartbeat.frequency.secs" 3, "topology.tasks" nil, "storm.messaging.netty.max_retries" 30, "topology.spout.wait.strategy" "backtype.storm.spout.SleepSpoutWaitStrategy", "topology.max.spout.pending" nil, "storm.zookeeper.retry.interval" 1000, "topology.sleep.spout.wait.strategy.time.ms" 1, "nimbus.topology.validator" "backtype.storm.nimbus.DefaultTopologyValidator", "supervisor.slots.ports" (1 2 3), "topology.debug" false, "nimbus.task.launch.secs" 120, "nimbus.supervisor.timeout.secs" 60, "topology.message.timeout.secs" 30, "task.refresh.poll.secs" 10, "topology.workers" 1, "supervisor.childopts" "-Xmx256m", "nimbus.thrift.port" 6627, "topology.stats.sample.rate" 0.05, "worker.heartbeat.frequency.secs" 1, "topology.tuple.serializer" "backtype.storm.serialization.types.ListDelegateSerializer", "topology.disruptor.wait.strategy" "com.lmax.disruptor.BlockingWaitStrategy", "nimbus.task.timeout.secs" 30, "storm.zookeeper.connection.timeout" 15000, "topology.kryo.factory" "backtype.storm.serialization.DefaultKryoFactory", "drpc.invocations.port" 3773, "logviewer.port" 8000, "zmq.threads" 1, "storm.zookeeper.retry.times" 5, "storm.thrift.transport" "backtype.storm.security.auth.SimpleTransportPlugin", "topology.state.synchronization.timeout.secs" 60, "supervisor.worker.timeout.secs" 30, "nimbus.file.copy.expiration.secs" 600, "storm.messaging.transport" "backtype.storm.messaging.zmq", "logviewer.appender.name" "A1", "storm.messaging.netty.max_wait_ms" 1000, "drpc.request.timeout.secs" 600, "storm.local.mode.zmq" false, "ui.port" 8080, "nimbus.childopts" "-Xmx1024m", "storm.cluster.mode" "local", "topology.optimize" true, "topology.max.task.parallelism" nil}
[main] INFO com.netflix.curator.framework.imps.CuratorFrameworkImpl - Starting
[main-EventThread] INFO backtype.storm.zookeeper - Zookeeper state update: :connected:none
[main] INFO com.netflix.curator.framework.imps.CuratorFrameworkImpl - Starting
[main] INFO backtype.storm.daemon.supervisor - Starting supervisor with id ee70aeee-1404-446b-97b1-2f432ec42a3d at host 192.168.128.10
[main] INFO backtype.storm.daemon.supervisor - Starting Supervisor with conf {"dev.zookeeper.path" "/tmp/dev-storm-zookeeper", "topology.tick.tuple.freq.secs" nil, "topology.builtin.metrics.bucket.size.secs" 60, "topology.fall.back.on.java.serialization" true, "topology.max.error.report.per.interval" 5, "zmq.linger.millis" 0, "topology.skip.missing.kryo.registrations" true, "storm.messaging.netty.client_worker_threads" 1, "ui.childopts" "-Xmx768m", "storm.zookeeper.session.timeout" 20000, "nimbus.reassign" true, "topology.trident.batch.emit.interval.millis" 50, "nimbus.monitor.freq.secs" 10, "logviewer.childopts" "-Xmx128m", "java.library.path" "/usr/local/lib:/opt/local/lib:/usr/lib", "topology.executor.send.buffer.size" 1024, "storm.local.dir" "/var/folders/91/j38ypw6105z8j5xrhq2t_ng00000gn/T//44b7d975-ab03-4aed-91e6-bceb6f4d0042", "storm.messaging.netty.buffer_size" 5242880, "supervisor.worker.start.timeout.secs" 120, "topology.enable.message.timeouts" true, "nimbus.cleanup.inbox.freq.secs" 600, "nimbus.inbox.jar.expiration.secs" 3600, "drpc.worker.threads" 64, "topology.worker.shared.thread.pool.size" 4, "nimbus.host" "localhost", "storm.messaging.netty.min_wait_ms" 100, "storm.zookeeper.port" 2000, "transactional.zookeeper.port" nil, "topology.executor.receive.buffer.size" 1024, "transactional.zookeeper.servers" nil, "storm.zookeeper.root" "/storm", "storm.zookeeper.retry.intervalceiling.millis" 30000, "supervisor.enable" true, "storm.messaging.netty.server_worker_threads" 1, "storm.zookeeper.servers" ["localhost"], "transactional.zookeeper.root" "/transactional", "topology.acker.executors" nil, "topology.transfer.buffer.size" 1024, "topology.worker.childopts" nil, "drpc.queue.size" 128, "worker.childopts" "-Xmx768m", "supervisor.heartbeat.frequency.secs" 5, "topology.error.throttle.interval.secs" 10, "zmq.hwm" 0, "drpc.port" 3772, "supervisor.monitor.frequency.secs" 3, "drpc.childopts" "-Xmx768m", "topology.receiver.buffer.size" 8, "task.heartbeat.frequency.secs" 3, "topology.tasks" nil, "storm.messaging.netty.max_retries" 30, "topology.spout.wait.strategy" "backtype.storm.spout.SleepSpoutWaitStrategy", "topology.max.spout.pending" nil, "storm.zookeeper.retry.interval" 1000, "topology.sleep.spout.wait.strategy.time.ms" 1, "nimbus.topology.validator" "backtype.storm.nimbus.DefaultTopologyValidator", "supervisor.slots.ports" (4 5 6), "topology.debug" false, "nimbus.task.launch.secs" 120, "nimbus.supervisor.timeout.secs" 60, "topology.message.timeout.secs" 30, "task.refresh.poll.secs" 10, "topology.workers" 1, "supervisor.childopts" "-Xmx256m", "nimbus.thrift.port" 6627, "topology.stats.sample.rate" 0.05, "worker.heartbeat.frequency.secs" 1, "topology.tuple.serializer" "backtype.storm.serialization.types.ListDelegateSerializer", "topology.disruptor.wait.strategy" "com.lmax.disruptor.BlockingWaitStrategy", "nimbus.task.timeout.secs" 30, "storm.zookeeper.connection.timeout" 15000, "topology.kryo.factory" "backtype.storm.serialization.DefaultKryoFactory", "drpc.invocations.port" 3773, "logviewer.port" 8000, "zmq.threads" 1, "storm.zookeeper.retry.times" 5, "storm.thrift.transport" "backtype.storm.security.auth.SimpleTransportPlugin", "topology.state.synchronization.timeout.secs" 60, "supervisor.worker.timeout.secs" 30, "nimbus.file.copy.expiration.secs" 600, "storm.messaging.transport" "backtype.storm.messaging.zmq", "logviewer.appender.name" "A1", "storm.messaging.netty.max_wait_ms" 1000, "drpc.request.timeout.secs" 600, "storm.local.mode.zmq" false, "ui.port" 8080, "nimbus.childopts" "-Xmx1024m", "storm.cluster.mode" "local", "topology.optimize" true, "topology.max.task.parallelism" nil}
[main] INFO com.netflix.curator.framework.imps.CuratorFrameworkImpl - Starting
[main-EventThread] INFO backtype.storm.zookeeper - Zookeeper state update: :connected:none
[main] INFO com.netflix.curator.framework.imps.CuratorFrameworkImpl - Starting
[main] INFO backtype.storm.daemon.supervisor - Starting supervisor with id 16a5efae-8c0a-40d4-a4c1-c232937f4ed7 at host 192.168.128.10
[main] INFO backtype.storm.daemon.nimbus - Received topology submission for kafka with conf {"topology.acker.executors" nil, "topology.kryo.register" {"storm.trident.topology.TransactionAttempt" nil}, "topology.kryo.decorators" (), "topology.name" "kafka", "storm.id" "kafka-1-1406704138", "topology.max.task.parallelism" 3, "topology.debug" true, "topology.max.spout.pending" 1}
[main] INFO backtype.storm.daemon.nimbus - Activating kafka: kafka-1-1406704138
[main] INFO backtype.storm.scheduler.EvenScheduler - Available slots: (["ee70aeee-1404-446b-97b1-2f432ec42a3d" 1] ["ee70aeee-1404-446b-97b1-2f432ec42a3d" 2] ["ee70aeee-1404-446b-97b1-2f432ec42a3d" 3] ["16a5efae-8c0a-40d4-a4c1-c232937f4ed7" 4] ["16a5efae-8c0a-40d4-a4c1-c232937f4ed7" 5] ["16a5efae-8c0a-40d4-a4c1-c232937f4ed7" 6])
[main] INFO backtype.storm.daemon.nimbus - Setting new assignment for topology id kafka-1-1406704138: #backtype.storm.daemon.common.Assignment{:master-code-dir "/var/folders/91/j38ypw6105z8j5xrhq2t_ng00000gn/T//d170038a-0f91-4124-b37c-2b5008188baa/nimbus/stormdist/kafka-1-1406704138", :node->host {"ee70aeee-1404-446b-97b1-2f432ec42a3d" "192.168.128.10"}, :executor->node+port {[3 3] ["ee70aeee-1404-446b-97b1-2f432ec42a3d" 1], [5 5] ["ee70aeee-1404-446b-97b1-2f432ec42a3d" 1], [4 4] ["ee70aeee-1404-446b-97b1-2f432ec42a3d" 1], [2 2] ["ee70aeee-1404-446b-97b1-2f432ec42a3d" 1], [1 1] ["ee70aeee-1404-446b-97b1-2f432ec42a3d" 1]}, :executor->start-time-secs {[1 1] 1406704139, [2 2] 1406704139, [4 4] 1406704139, [5 5] 1406704139, [3 3] 1406704139}}
[main] INFO backtype.storm.daemon.nimbus - Shutting down master
[main] INFO backtype.storm.daemon.nimbus - Shut down master
[main] INFO backtype.storm.daemon.supervisor - Shutting down supervisor ee70aeee-1404-446b-97b1-2f432ec42a3d
[Thread-8] INFO backtype.storm.event - Event manager interrupted
[Thread-9] INFO backtype.storm.event - Event manager interrupted
[main] INFO backtype.storm.daemon.supervisor - Shutting down supervisor 16a5efae-8c0a-40d4-a4c1-c232937f4ed7
[Thread-12] INFO backtype.storm.event - Event manager interrupted
[Thread-13] INFO backtype.storm.event - Event manager interrupted
[main] INFO backtype.storm.testing - Shutting down in process zookeeper
[main] INFO backtype.storm.testing - Done shutting down in process zookeeper
[main] INFO backtype.storm.testing - Deleting temporary path /var/folders/91/j38ypw6105z8j5xrhq2t_ng00000gn/T//d170038a-0f91-4124-b37c-2b5008188baa
[main] INFO backtype.storm.testing - Deleting temporary path /var/folders/91/j38ypw6105z8j5xrhq2t_ng00000gn/T//4b0cc44b-8d8c-459c-b400-03769fe2a7df
[main] INFO backtype.storm.testing - Deleting temporary path /var/folders/91/j38ypw6105z8j5xrhq2t_ng00000gn/T//724c7721-297a-42aa-87b1-7c3bc8a4dd24
[main] INFO backtype.storm.testing - Deleting temporary path /var/folders/91/j38ypw6105z8j5xrhq2t_ng00000gn/T//44b7d975-ab03-4aed-91e6-bceb6f4d0042

 Any idea about this problem?

thanks

Alec

On Jul 29, 2014, at 11:17 PM, Palak Shah <sp...@gmail.com> wrote:

> Hi, 
> 
> I am using the Kafka spout that in integrated in apache-storm-0.9.2-incubating release. I am able to submit the topology to my storm cluster, but it is not receiving any tuples from the Kafka topic. I know the topic ("page_visits") has data because I can read it from the console. 
> 
> Here is my code for topology : 
> 
> public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException {
>         BrokerHosts zkHost = new ZkHosts("localhost:2181");
>         SpoutConfig spoutConfig = new SpoutConfig(
>                   zkHost, // list of Kafka brokers
>                   "page_visits", // topic to read from
>                   "/zkroot", // the root path in Zookeeper for the spout to store the consumer offsets
>                   "discovery"); // an id for this consumer for storing the consumer offsets in Zookeeper
>         spoutConfig.forceFromStart = true;        
>         KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);
>         
>         TopologyBuilder builder = new TopologyBuilder();
>         builder.setSpout("kafkaSpout", kafkaSpout);
>         builder.setBolt("kafkaBolt", new PrinterBolt()).shuffleGrouping("kafkaSpout");
>         
>         Config conf = new Config();
>         conf.setNumWorkers(4);
>         conf.setDebug(true);
>         
>         StormSubmitter.submitTopology(args[0], conf, builder.createTopology());
>         
>     }
>     
> }
> 
> I am using apache-storm-0.9.2-incubating and kafka-0.9.2-0.8.1.1. Is this a versions compatibility issue? if so, which version should I use for this to work? 
> 
> Thanks in Advance,
> Palak Shah


Re: Kafka Spout not reading from Topic

Posted by Palak Shah <sp...@gmail.com>.
Hi all,

I solved my problem by adding the maven-assembly-plugin and creating a
jar-with-dependencies file. I used this to run my code and it worked.

-Palak


On Thu, Jul 31, 2014 at 12:40 AM, Sa Li <sa...@gmail.com> wrote:

> Hi, thanks for reply , I did check the zkCli
> [zk: 127.0.0.1:2181(CONNECTED) 0] ls /
> [consumers, controller, brokers, zookeeper, controller_epoch]
> [zk: 127.0.0.1:2181(CONNECTED) 1] ls /brokers
> [topics, ids]
>
> [zk: 127.0.0.1:2181(CONNECTED) 3] get /brokers
> null
> cZxid = 0x631
> ctime = Thu Jul 17 20:48:15 PDT 2014
> mZxid = 0x631
> mtime = Thu Jul 17 20:48:15 PDT 2014
> pZxid = 0x63f
> cversion = 2
> dataVersion = 0
> aclVersion = 0
> ephemeralOwner = 0x0
> dataLength = 0
> numChildren = 2
>
> The topic I created is ingest_topic, how I can find it from zkCli? I can
> user kafka-console-consumer to consume the messages
>
> Thanks
>
> Alec
>
> On Jul 30, 2014, at 11:12 AM, Parth Brahmbhatt <
> pbrahmbhatt@hortonworks.com> wrote:
>
> Hi,
>
> Nothing really wrong with your code. Can you confirm that zookeeper is
> indeed running on localhost:2181 and can you log into zookeeper CLI (zkCli)
> and check that "ls /" returns "brokers" directory under which you should be
> able to find your topic. This is the default directory that ZkHosts.java
> looks for.
>
> Thanks
> Parth
>
> On Tue, Jul 29, 2014 at 11:17 PM, Palak Shah <sp...@gmail.com> wrote:
>
>> Hi,
>>
>> I am using the Kafka spout that in integrated in
>> apache-storm-0.9.2-incubating release. I am able to submit the topology to
>> my storm cluster, but it is not receiving any tuples from the Kafka topic.
>> I know the topic ("page_visits") has data because I can read it from the
>> console.
>>
>> Here is my code for topology :
>>
>> public static void main(String[] args) throws AlreadyAliveException,
>> InvalidTopologyException {
>>         BrokerHosts zkHost = new ZkHosts("localhost:2181");
>>         SpoutConfig spoutConfig = new SpoutConfig(
>>                   zkHost, // list of Kafka brokers
>>                   "page_visits", // topic to read from
>>                   "/zkroot", // the root path in Zookeeper for the spout
>> to store the consumer offsets
>>                   "discovery"); // an id for this consumer for storing
>> the consumer offsets in Zookeeper
>>         spoutConfig.forceFromStart = true;
>>         KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);
>>
>>         TopologyBuilder builder = new TopologyBuilder();
>>         builder.setSpout("kafkaSpout", kafkaSpout);
>>         builder.setBolt("kafkaBolt", new
>> PrinterBolt()).shuffleGrouping("kafkaSpout");
>>
>>         Config conf = new Config();
>>         conf.setNumWorkers(4);
>>         conf.setDebug(true);
>>
>>         StormSubmitter.submitTopology(args[0], conf,
>> builder.createTopology());
>>
>>     }
>>
>> }
>>
>> I am using apache-storm-0.9.2-incubating and kafka-0.9.2-0.8.1.1. Is this
>> a versions compatibility issue? if so, which version should I use for this
>> to work?
>>
>> Thanks in Advance,
>> Palak Shah
>>
>
>
>
> --
> Thanks
> Parth
>
> CONFIDENTIALITY NOTICE
> NOTICE: This message is intended for the use of the individual or entity
> to which it is addressed and may contain information that is confidential,
> privileged and exempt from disclosure under applicable law. If the reader
> of this message is not the intended recipient, you are hereby notified that
> any printing, copying, dissemination, distribution, disclosure or
> forwarding of this communication is strictly prohibited. If you have
> received this communication in error, please contact the sender immediately
> and delete it from your system. Thank You.
>
>
>

Re: Kafka Spout not reading from Topic

Posted by Sa Li <sa...@gmail.com>.
Hi, thanks for reply , I did check the zkCli
[zk: 127.0.0.1:2181(CONNECTED) 0] ls /
[consumers, controller, brokers, zookeeper, controller_epoch]
[zk: 127.0.0.1:2181(CONNECTED) 1] ls /brokers
[topics, ids]

[zk: 127.0.0.1:2181(CONNECTED) 3] get /brokers
null
cZxid = 0x631
ctime = Thu Jul 17 20:48:15 PDT 2014
mZxid = 0x631
mtime = Thu Jul 17 20:48:15 PDT 2014
pZxid = 0x63f
cversion = 2
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 0
numChildren = 2

The topic I created is ingest_topic, how I can find it from zkCli? I can user kafka-console-consumer to consume the messages

Thanks

Alec

On Jul 30, 2014, at 11:12 AM, Parth Brahmbhatt <pb...@hortonworks.com> wrote:

> Hi,
> 
> Nothing really wrong with your code. Can you confirm that zookeeper is indeed running on localhost:2181 and can you log into zookeeper CLI (zkCli) and check that "ls /" returns "brokers" directory under which you should be able to find your topic. This is the default directory that ZkHosts.java looks for.
> 
> Thanks
> Parth 
> 
> On Tue, Jul 29, 2014 at 11:17 PM, Palak Shah <sp...@gmail.com> wrote:
> Hi, 
> 
> I am using the Kafka spout that in integrated in apache-storm-0.9.2-incubating release. I am able to submit the topology to my storm cluster, but it is not receiving any tuples from the Kafka topic. I know the topic ("page_visits") has data because I can read it from the console. 
> 
> Here is my code for topology : 
> 
> public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException {
>         BrokerHosts zkHost = new ZkHosts("localhost:2181");
>         SpoutConfig spoutConfig = new SpoutConfig(
>                   zkHost, // list of Kafka brokers
>                   "page_visits", // topic to read from
>                   "/zkroot", // the root path in Zookeeper for the spout to store the consumer offsets
>                   "discovery"); // an id for this consumer for storing the consumer offsets in Zookeeper
>         spoutConfig.forceFromStart = true;        
>         KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);
>         
>         TopologyBuilder builder = new TopologyBuilder();
>         builder.setSpout("kafkaSpout", kafkaSpout);
>         builder.setBolt("kafkaBolt", new PrinterBolt()).shuffleGrouping("kafkaSpout");
>         
>         Config conf = new Config();
>         conf.setNumWorkers(4);
>         conf.setDebug(true);
>         
>         StormSubmitter.submitTopology(args[0], conf, builder.createTopology());
>         
>     }
>     
> }
> 
> I am using apache-storm-0.9.2-incubating and kafka-0.9.2-0.8.1.1. Is this a versions compatibility issue? if so, which version should I use for this to work? 
> 
> Thanks in Advance,
> Palak Shah
> 
> 
> 
> -- 
> Thanks
> Parth
> 
> CONFIDENTIALITY NOTICE
> NOTICE: This message is intended for the use of the individual or entity to which it is addressed and may contain information that is confidential, privileged and exempt from disclosure under applicable law. If the reader of this message is not the intended recipient, you are hereby notified that any printing, copying, dissemination, distribution, disclosure or forwarding of this communication is strictly prohibited. If you have received this communication in error, please contact the sender immediately and delete it from your system. Thank You.


Re: Kafka Spout not reading from Topic

Posted by Parth Brahmbhatt <pb...@hortonworks.com>.
Hi,

Nothing really wrong with your code. Can you confirm that zookeeper is
indeed running on localhost:2181 and can you log into zookeeper CLI (zkCli)
and check that "ls /" returns "brokers" directory under which you should be
able to find your topic. This is the default directory that ZkHosts.java
looks for.

Thanks
Parth

On Tue, Jul 29, 2014 at 11:17 PM, Palak Shah <sp...@gmail.com> wrote:

> Hi,
>
> I am using the Kafka spout that in integrated in
> apache-storm-0.9.2-incubating release. I am able to submit the topology to
> my storm cluster, but it is not receiving any tuples from the Kafka topic.
> I know the topic ("page_visits") has data because I can read it from the
> console.
>
> Here is my code for topology :
>
> public static void main(String[] args) throws AlreadyAliveException,
> InvalidTopologyException {
>         BrokerHosts zkHost = new ZkHosts("localhost:2181");
>         SpoutConfig spoutConfig = new SpoutConfig(
>                   zkHost, // list of Kafka brokers
>                   "page_visits", // topic to read from
>                   "/zkroot", // the root path in Zookeeper for the spout
> to store the consumer offsets
>                   "discovery"); // an id for this consumer for storing the
> consumer offsets in Zookeeper
>         spoutConfig.forceFromStart = true;
>         KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);
>
>         TopologyBuilder builder = new TopologyBuilder();
>         builder.setSpout("kafkaSpout", kafkaSpout);
>         builder.setBolt("kafkaBolt", new
> PrinterBolt()).shuffleGrouping("kafkaSpout");
>
>         Config conf = new Config();
>         conf.setNumWorkers(4);
>         conf.setDebug(true);
>
>         StormSubmitter.submitTopology(args[0], conf,
> builder.createTopology());
>
>     }
>
> }
>
> I am using apache-storm-0.9.2-incubating and kafka-0.9.2-0.8.1.1. Is this
> a versions compatibility issue? if so, which version should I use for this
> to work?
>
> Thanks in Advance,
> Palak Shah
>



-- 
Thanks
Parth

-- 
CONFIDENTIALITY NOTICE
NOTICE: This message is intended for the use of the individual or entity to 
which it is addressed and may contain information that is confidential, 
privileged and exempt from disclosure under applicable law. If the reader 
of this message is not the intended recipient, you are hereby notified that 
any printing, copying, dissemination, distribution, disclosure or 
forwarding of this communication is strictly prohibited. If you have 
received this communication in error, please contact the sender immediately 
and delete it from your system. Thank You.