You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@storm.apache.org by Ryan Bliton <ry...@gmail.com> on 2017/10/11 14:53:32 UTC

New to Storm- KafkaSpout won't emit Tuples in Cluster mode

Hi! I'm trying to get a starter Kafka-Storm integration going. I've got a
simple topology working in Local mode- It reads the messages from a Kafka
topic and sends them to a bolt that logs them. However, when I try to
submit the Topology to a cluster, the Storm UI always reads 0 tuples
emitted from the KafkaSpout.

I've done several laps around the internet at this point, built and tried
different starter projects, and each has the same issue. I can submit the
Topology, but it won't actually work.

Similar problems to mine seem to come from the Storm /lib and incompatible
.jar files within. I haven't found anything like that in my case. However,
I'm not 100% sure what I should be looking for so I can't rule it out.

I don't know how to make code look pretty on a mailing list, so here is a
stack overflow about my issue:

https://stackoverflow.com/questions/46676377/apache-storm-kafka-cant-see-sent-kafka-messages-in-storm-ui

I make sure to call storm.supervisor before testing.

I have zookeeper running off port 2181.

I spin up a Kafka broker and use the topic storm-test-topic1.

I fire up a console Kafka producer to send nonsense messages.

Storm.yaml:
########### These MUST be filled in for a storm configuration
 storm.zookeeper.servers:
     - "localhost"
#     - "server2"
#
 nimbus.seeds: ["localhost"]
#
#

----------------------------------------------------------------------------------------------
Topology:

package com.kafka.storm;

import java.util.HashMap;

import org.apache.log4j.Logger;
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.generated.AlreadyAliveException;
import org.apache.storm.generated.AuthorizationException;
import org.apache.storm.generated.InvalidTopologyException;
import org.apache.storm.kafka.BrokerHosts;
import org.apache.storm.kafka.KafkaSpout;
import org.apache.storm.kafka.SpoutConfig;
import org.apache.storm.kafka.StringScheme;
import org.apache.storm.kafka.ZkHosts;
import org.apache.storm.spout.SchemeAsMultiScheme;
import org.apache.storm.topology.TopologyBuilder;

import com.kafka.storm.bolt.LoggerBolt;

public class KafkaStormIntegrationDemo {
private static final Logger LOG =
Logger.getLogger(KafkaStormIntegrationDemo.class);

public static void main(String[] args) throws InvalidTopologyException,
AuthorizationException, AlreadyAliveException {

// Build Spout configuration using input command line parameters
final BrokerHosts zkrHosts = new ZkHosts("localhost:2181");
final String kafkaTopic = "storm-test-topic1";
final String zkRoot = "";
final String clientId = "storm-consumer";
SpoutConfig kafkaConf = new SpoutConfig(zkrHosts, kafkaTopic, zkRoot,
clientId);
kafkaConf.startOffsetTime = -2;
kafkaConf.scheme = new SchemeAsMultiScheme(new StringScheme());

// Build topology to consume message from kafka and print them on console
final TopologyBuilder topologyBuilder = new TopologyBuilder();
// Create KafkaSpout instance using Kafka configuration and add it to
topology
topologyBuilder.setSpout("kafka-spout", new KafkaSpout(kafkaConf), 1);
//Route the output of Kafka Spout to Logger bolt to log messages consumed
from Kafka
topologyBuilder.setBolt("print-messages", new
LoggerBolt()).globalGrouping("kafka-spout");
// Submit topology to local cluster i.e. embedded storm instance in eclipse
Config conf = new Config();
System.setProperty("storm.jar","C://apache-storm-1.1.1/lib/storm-core-1.1.1.jar");
StormSubmitter.submitTopology("kafkaTopology", conf,
topologyBuilder.createTopology());
}
}
----------------------------------------------------------------------------------------------

Bolt:

package com.kafka.storm.bolt;

import org.apache.log4j.Logger;
import org.apache.storm.topology.BasicOutputCollector;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseBasicBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;

public class LoggerBolt extends BaseBasicBolt{
private static final long serialVersionUID = 1L;
private static final Logger LOG = Logger.getLogger(LoggerBolt.class);

public void execute(Tuple input, BasicOutputCollector collector) {
LOG.info(input.getString(0));
}

public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("message"));
}
}


thank you in advance for any help you can give, or for just reading!

Re: New to Storm- KafkaSpout won't emit Tuples in Cluster mode

Posted by Stig Rohde Døssing <sr...@apache.org>.
I just noticed that you're on Windows. Please ensure you run Storm as
administrator. The supervisor needs to create some symlinks when it starts
workers, and Windows doesn't allow regular users to do that.

When I run the supervisor without admin rights, the workers fail to start,
both on 1.1.1 and 2.0.0. I haven't investigated why this failure is quiet
in 1.1.1, but in 2.0.0 at least I get a meaningful error in the log:

java.nio.file.FileSystemException:
E:\apache-storm-2.0.0-SNAPSHOT\storm-local\workers\d5b028c0-5332-448b-8fad-a1846794c92d\artifacts:
A required privilege is not held by the client.

    at
sun.nio.fs.WindowsException.translateToIOException(WindowsException.java:86)
~[?:1.8.0_144]
    at
sun.nio.fs.WindowsException.rethrowAsIOException(WindowsException.java:97)
~[?:1.8.0_144]
    at
sun.nio.fs.WindowsException.rethrowAsIOException(WindowsException.java:102)
~[?:1.8.0_144]
    at
sun.nio.fs.WindowsFileSystemProvider.createSymbolicLink(WindowsFileSystemProvider.java:585)
~[?:1.8.0_144]
    at java.nio.file.Files.createSymbolicLink(Files.java:1043)
~[?:1.8.0_144]
    at
org.apache.storm.daemon.supervisor.AdvancedFSOps.createSymlink(AdvancedFSOps.java:371)
~[storm-client-2.0.0-SNAPSHOT.jar:2.0.0-SNAPSHOT]
    at
org.apache.storm.daemon.supervisor.Container.createArtifactsLink(Container.java:442)
~[storm-server-2.0.0-SNAPSHOT.jar:2.0.0-SNAPSHOT]
    at
org.apache.storm.daemon.supervisor.Container.setup(Container.java:379)
~[storm-server-2.0.0-SNAPSHOT.jar:2.0.0-SNAPSHOT]
    at
org.apache.storm.daemon.supervisor.BasicContainerLauncher.launchContainer(BasicContainerLauncher.java:46)
~[storm-server-2.0.0-SNAPSHOT.jar:2.0.0-SNAPSHOT]
    at
org.apache.storm.daemon.supervisor.Slot.handleWaitingForBlobLocalization(Slot.java:422)
~[storm-server-2.0.0-SNAPSHOT.jar:2.0.0-SNAPSHOT]
    at
org.apache.storm.daemon.supervisor.Slot.stateMachineStep(Slot.java:308)
~[storm-server-2.0.0-SNAPSHOT.jar:2.0.0-SNAPSHOT]
    at org.apache.storm.daemon.supervisor.Slot.run(Slot.java:805)
[storm-server-2.0.0-SNAPSHOT.jar:2.0.0-SNAPSHOT]

2017-10-11 23:46 GMT+02:00 Ryan Bliton <ry...@gmail.com>:

> you are right, fixing the display bug did not affect the cluster at all.
> Thank you again for taking the time. I'm just about ready to give up, I've
> been working on this all week.
>
> I have Kafka, Zookeeper, Nimbus, supervisor, and UI all running in
> localhost, I am not connecting to any other machines.
>
> I called Supervisor again- the UI registers that there are executors for a
> minute and then they go away having done nothing. Here are the logs of
> Nimbus and Supervisor around the time I called storm supervisor.
>
> In the logs I swapped my actual PC's name with [MY PC NAME] before posting.
>
> Supervisor log:
>
> 2017-10-11 17:26:20.552 o.a.s.z.Zookeeper main [INFO] Staring ZK Curator
> 2017-10-11 17:26:20.563 o.a.s.s.o.a.c.f.i.CuratorFrameworkImpl main
> [INFO] Starting
> 2017-10-11 17:26:20.579 o.a.s.s.o.a.z.ZooKeeper main [INFO] Client
> environment:zookeeper.version=3.4.6-1569965, built on 02/20/2014 09:09 GMT
> 2017-10-11 17:26:20.580 o.a.s.s.o.a.z.ZooKeeper main [INFO] Client
> environment:host.name=[MY PC NAME]
> 2017-10-11 17:26:20.580 o.a.s.s.o.a.z.ZooKeeper main [INFO] Client
> environment:java.version=1.8.0_131
> 2017-10-11 17:26:20.581 o.a.s.s.o.a.z.ZooKeeper main [INFO] Client
> environment:java.vendor=Oracle Corporation
> 2017-10-11 17:26:20.582 o.a.s.s.o.a.z.ZooKeeper main [INFO] Client
> environment:java.home=C:\JAVA\jdk1.8.0_131\jre
> 2017-10-11 17:26:20.582 o.a.s.s.o.a.z.ZooKeeper main [INFO] Client
> environment:java.class.path=C:\apache-storm-1.1.1\*;C:\
> apache-storm-1.1.1\conf;C:\JAVA\jdk1.8.0_131\lib\tools.
> jar;C:\apache-storm-1.1.1\lib\asm-5.0.3.jar;C:\apache-storm-
> 1.1.1\lib\clojure-1.7.0.jar;C:\apache-storm-1.1.1\lib\
> disruptor-3.3.2.jar;C:\apache-storm-1.1.1\lib\kryo-3.0.3.
> jar;C:\apache-storm-1.1.1\lib\log4j-api-2.8.2.jar;C:\apache-
> storm-1.1.1\lib\log4j-core-2.8.2.jar;C:\apache-storm-1.1.1\
> lib\log4j-over-slf4j-1.6.6.jar;C:\apache-storm-1.1.1\lib\
> log4j-slf4j-impl-2.8.2.jar;C:\apache-storm-1.1.1\lib\minlog-
> 1.3.0.jar;C:\apache-storm-1.1.1\lib\objenesis-2.1.jar;C:\
> apache-storm-1.1.1\lib\reflectasm-1.10.1.jar;C:\
> apache-storm-1.1.1\lib\ring-cors-0.1.5.jar;C:\apache-
> storm-1.1.1\lib\servlet-api-2.5.jar;C:\apache-storm-1.1.1\
> lib\slf4j-api-1.7.21.jar;C:\apache-storm-1.1.1\lib\storm-
> core-1.1.1.jar;C:\apache-storm-1.1.1\lib\storm-rename-hack-1.1.1.jar
> 2017-10-11 17:26:20.582 o.a.s.s.o.a.z.ZooKeeper main [INFO] Client
> environment:java.library.path=/usr/local/lib:/opt/local/lib:
> /usr/lib;C:\JAVA\jdk1.8.0_131\bin;C:\JAVA\jdk1.8.0_131\lib;
> C:\JAVA\jdk1.8.0_131\jre\bin;C:\JAVA\jdk1.8.0_131\jre\lib
> 2017-10-11 17:26:20.582 o.a.s.s.o.a.z.ZooKeeper main [INFO] Client
> environment:java.io.tmpdir=C:\Users\rbliton\AppData\Local\Temp\
> 2017-10-11 17:26:20.582 o.a.s.s.o.a.z.ZooKeeper main [INFO] Client
> environment:java.compiler=<NA>
> 2017-10-11 17:26:20.582 o.a.s.s.o.a.z.ZooKeeper main [INFO] Client
> environment:os.name=Windows 10
> 2017-10-11 17:26:20.583 o.a.s.s.o.a.z.ZooKeeper main [INFO] Client
> environment:os.arch=amd64
> 2017-10-11 17:26:20.583 o.a.s.s.o.a.z.ZooKeeper main [INFO] Client
> environment:os.version=10.0
> 2017-10-11 17:26:20.583 o.a.s.s.o.a.z.ZooKeeper main [INFO] Client
> environment:user.name=rbliton
> 2017-10-11 17:26:20.583 o.a.s.s.o.a.z.ZooKeeper main [INFO] Client
> environment:user.home=C:\Users\rbliton
> 2017-10-11 17:26:20.583 o.a.s.s.o.a.z.ZooKeeper main [INFO] Client
> environment:user.dir=C:\apache-storm-1.1.1\bin
> 2017-10-11 17:26:20.585 o.a.s.s.o.a.z.ZooKeeper main [INFO] Initiating
> client connection, connectString=localhost:2181 sessionTimeout=20000
> watcher=org.apache.storm.shade.org.apache.curator.ConnectionState@2216effc
> 2017-10-11 17:26:20.801 o.a.s.s.o.a.z.ClientCnxn main-SendThread(
> 127.0.0.1:2181) [INFO] Opening socket connection to server
> 127.0.0.1/127.0.0.1:2181. Will not attempt to authenticate using SASL
> (unknown error)
> 2017-10-11 17:26:20.803 o.a.s.s.o.a.z.ClientCnxn main-SendThread(
> 127.0.0.1:2181) [INFO] Socket connection established to
> 127.0.0.1/127.0.0.1:2181, initiating session
> 2017-10-11 17:26:20.826 o.a.s.s.o.a.z.ClientCnxn main-SendThread(
> 127.0.0.1:2181) [INFO] Session establishment complete on server
> 127.0.0.1/127.0.0.1:2181, sessionid = 0x15f0d3b46a70016, negotiated
> timeout = 20000
> 2017-10-11 17:26:20.845 o.a.s.s.o.a.c.f.s.ConnectionStateManager
> main-EventThread [INFO] State change: CONNECTED
> 2017-10-11 17:26:20.856 o.a.s.s.o.a.c.f.i.CuratorFrameworkImpl
> Curator-Framework-0 [INFO] backgroundOperationsLoop exiting
> 2017-10-11 17:26:20.867 o.a.s.s.o.a.z.ZooKeeper main [INFO] Session:
> 0x15f0d3b46a70016 closed
> 2017-10-11 17:26:20.869 o.a.s.z.Zookeeper main [INFO] Staring ZK Curator
> 2017-10-11 17:26:20.870 o.a.s.s.o.a.c.f.i.CuratorFrameworkImpl main
> [INFO] Starting
> 2017-10-11 17:26:20.869 o.a.s.s.o.a.z.ClientCnxn main-EventThread [INFO]
> EventThread shut down
> 2017-10-11 17:26:20.870 o.a.s.s.o.a.z.ZooKeeper main [INFO] Initiating
> client connection, connectString=localhost:2181/storm
> sessionTimeout=20000 watcher=org.apache.storm.shade.org.apache.curator.
> ConnectionState@4417af13
> 2017-10-11 17:26:20.880 o.a.s.s.o.a.z.ClientCnxn
> main-SendThread(0:0:0:0:0:0:0:1:2181) [INFO] Opening socket connection to
> server 0:0:0:0:0:0:0:1/0:0:0:0:0:0:0:1:2181. Will not attempt to
> authenticate using SASL (unknown error)
> 2017-10-11 17:26:20.881 o.a.s.s.o.a.z.ClientCnxn
> main-SendThread(0:0:0:0:0:0:0:1:2181) [INFO] Socket connection
> established to 0:0:0:0:0:0:0:1/0:0:0:0:0:0:0:1:2181, initiating session
> 2017-10-11 17:26:20.932 o.a.s.s.o.a.z.ClientCnxn
> main-SendThread(0:0:0:0:0:0:0:1:2181) [INFO] Session establishment
> complete on server 0:0:0:0:0:0:0:1/0:0:0:0:0:0:0:1:2181, sessionid =
> 0x15f0d3b46a70017, negotiated timeout = 20000
> 2017-10-11 17:26:20.933 o.a.s.s.o.a.c.f.s.ConnectionStateManager
> main-EventThread [INFO] State change: CONNECTED
> 2017-10-11 17:26:20.952 o.a.s.l.Localizer main [INFO] Reconstruct
> localized resource: C:\apache-storm-1.1.1\storm-local\supervisor\usercache
> 2017-10-11 17:26:20.954 o.a.s.l.Localizer main [WARN] No left over
> resources found for any user during reconstructing of local resources at:
> C:\apache-storm-1.1.1\storm-local\supervisor\usercache
> 2017-10-11 17:26:20.962 o.a.s.d.s.Supervisor main [INFO] Starting
> supervisor for storm version '1.1.1'.
> 2017-10-11 17:26:21.005 o.a.s.d.s.Slot main [WARN] SLOT [MY PC NAME]:6700
> Starting in state EMPTY - assignment null
> 2017-10-11 17:26:21.006 o.a.s.d.s.Slot main [WARN] SLOT [MY PC NAME]:6701
> Starting in state EMPTY - assignment null
> 2017-10-11 17:26:21.006 o.a.s.d.s.Slot main [WARN] SLOT [MY PC NAME]:6701
> Starting in state EMPTY - assignment null
> 2017-10-11 17:26:21.009 o.a.s.d.s.Slot main [WARN] SLOT [MY PC NAME]:6703
> Starting in state EMPTY - assignment null
> 2017-10-11 17:26:21.020 o.a.s.d.s.Container main [INFO] GET worker-user
> for 964b34c1-d7e3-4f22-9ee2-a0664c10294e
> 2017-10-11 17:26:21.022 o.a.s.d.s.Container main [INFO] Cleaning up
> 5ef774ff-f24c-4810-9330-5e1291bd7539:964b34c1-d7e3-4f22-9ee2-a0664c10294e
> 2017-10-11 17:26:21.023 o.a.s.d.s.Container main [INFO] GET worker-user
> for 964b34c1-d7e3-4f22-9ee2-a0664c10294e
> 2017-10-11 17:26:21.024 o.a.s.d.s.AdvancedFSOps main [INFO] Deleting path
> C:\apache-storm-1.1.1\storm-local\workers\964b34c1-d7e3-
> 4f22-9ee2-a0664c10294e\heartbeats
> 2017-10-11 17:26:21.029 o.a.s.d.s.AdvancedFSOps main [INFO] Deleting path
> C:\apache-storm-1.1.1\storm-local\workers\964b34c1-d7e3-
> 4f22-9ee2-a0664c10294e\pids
> 2017-10-11 17:26:21.032 o.a.s.d.s.AdvancedFSOps main [INFO] Deleting path
> C:\apache-storm-1.1.1\storm-local\workers\964b34c1-d7e3-
> 4f22-9ee2-a0664c10294e\tmp
> 2017-10-11 17:26:21.034 o.a.s.d.s.AdvancedFSOps main [INFO] Deleting path
> C:\apache-storm-1.1.1\storm-local\workers\964b34c1-d7e3-
> 4f22-9ee2-a0664c10294e
>
> Nimbus logs:
>
> 2017-10-11 17:26:22.189 o.a.s.d.nimbus timer [INFO] Executor
> storm-kafka-topology1-2-1507756303:[2 2] not alive
> 2017-10-11 17:26:22.189 o.a.s.d.nimbus timer [INFO] Executor
> storm-kafka-topology1-2-1507756303:[3 3] not alive
> 2017-10-11 17:26:22.189 o.a.s.d.nimbus timer [INFO] Executor
> storm-kafka-topology1-2-1507756303:[1 1] not alive
> 2017-10-11 17:26:22.189 o.a.s.d.nimbus timer [INFO] Executor
> storm-kafka-topology1-2-1507756303:[4 4] not alive
> 2017-10-11 17:26:22.190 o.a.s.d.nimbus timer [INFO] Executor
> kafkaTopology-1-1507755826:[2 2] not alive
> 2017-10-11 17:26:22.190 o.a.s.d.nimbus timer [INFO] Executor
> kafkaTopology-1-1507755826:[3 3] not alive
> 2017-10-11 17:26:22.190 o.a.s.d.nimbus timer [INFO] Executor
> kafkaTopology-1-1507755826:[1 1] not alive
> 2017-10-11 17:26:22.192 o.a.s.s.EvenScheduler timer [INFO] Available
> slots: (["5ef774ff-f24c-4810-9330-5e1291bd7539" 6700]
> ["5ef774ff-f24c-4810-9330-5e1291bd7539" 6701] ["5ef774ff-f24c-4810-9330-5e1291bd7539"
> 6702] ["5ef774ff-f24c-4810-9330-5e1291bd7539" 6703])
> 2017-10-11 17:26:22.192 o.a.s.s.EvenScheduler timer [INFO] Available
> slots: (["5ef774ff-f24c-4810-9330-5e1291bd7539" 6701]
> ["5ef774ff-f24c-4810-9330-5e1291bd7539" 6702] ["5ef774ff-f24c-4810-9330-5e1291bd7539"
> 6703])
> 2017-10-11 17:26:22.195 o.a.s.d.nimbus timer [INFO] Setting new assignment
> for topology id storm-kafka-topology1-2-1507756303:
> #org.apache.storm.daemon.common.Assignment{:master-code-dir
> "storm-local", :node->host {"5ef774ff-f24c-4810-9330-5e1291bd7539" "
> LNAR-PC0611K6.corp.capgemini.com"}, :executor->node+port {[4 4]
> ["5ef774ff-f24c-4810-9330-5e1291bd7539" 6701], [3 3]
> ["5ef774ff-f24c-4810-9330-5e1291bd7539" 6701], [2 2]
> ["5ef774ff-f24c-4810-9330-5e1291bd7539" 6701], [1 1]
> ["5ef774ff-f24c-4810-9330-5e1291bd7539" 6701]},
> :executor->start-time-secs {[1 1] 1507757182, [2 2] 1507757182, [3 3]
> 1507757182, [4 4] 1507757182}, :worker->resources
> {["5ef774ff-f24c-4810-9330-5e1291bd7539" 6701] [0.0 0.0 0.0]}, :owner
> "rbliton"}
> 2017-10-11 17:26:22.201 o.a.s.d.nimbus timer [INFO] Setting new assignment
> for topology id kafkaTopology-1-1507755826: #org.apache.storm.daemon.
> common.Assignment{:master-code-dir "storm-local", :node->host
> {"5ef774ff-f24c-4810-9330-5e1291bd7539" "LNAR-PC0611K6.corp.capgemini.com"},
> :executor->node+port {[2 2] ["5ef774ff-f24c-4810-9330-5e1291bd7539"
> 6700], [1 1] ["5ef774ff-f24c-4810-9330-5e1291bd7539" 6700], [3 3]
> ["5ef774ff-f24c-4810-9330-5e1291bd7539" 6700]},
> :executor->start-time-secs {[1 1] 1507757182, [2 2] 1507757182, [3 3]
> 1507757182}, :worker->resources {["5ef774ff-f24c-4810-9330-5e1291bd7539"
> 6700] [0.0 0.0 0.0]}, :owner "rbliton"}
> 2017-10-11 17:26:23.209 o.a.s.d.nimbus pool-14-thread-25 [INFO] Created
> download session for kafkaTopology-1-1507755826-stormjar.jar with id
> bed36ea8-1d05-4d80-8006-c2bfb40ed8f6
> 2017-10-11 17:26:23.801 o.a.s.d.nimbus pool-14-thread-7 [INFO] Created
> download session for kafkaTopology-1-1507755826-stormcode.ser with id
> 7227688a-4381-48d5-ac27-c6a6e03f5a5d
> 2017-10-11 17:26:23.837 o.a.s.d.nimbus pool-14-thread-11 [INFO] Created
> download session for kafkaTopology-1-1507755826-stormconf.ser with id
> d4d6712b-55ec-46fe-8e9c-6df068d65133
> 2017-10-11 17:26:23.913 o.a.s.d.nimbus pool-14-thread-20 [INFO] Created
> download session for storm-kafka-topology1-2-1507756303-stormjar.jar with
> id 3b7263a0-fd39-4caf-b90b-3deaf3999308
> 2017-10-11 17:26:24.496 o.a.s.d.nimbus pool-14-thread-64 [INFO] Created
> download session for storm-kafka-topology1-2-1507756303-stormcode.ser
> with id cd88098a-22c7-44c3-b709-34a202bc4952
> 2017-10-11 17:26:24.529 o.a.s.d.nimbus pool-14-thread-63 [INFO] Created
> download session for storm-kafka-topology1-2-1507756303-stormconf.ser
> with id e7478249-7082-4edb-93bb-3746f39db06e
> 2017-10-11 17:28:29.931 o.a.s.d.nimbus timer [INFO] Executor
> storm-kafka-topology1-2-1507756303:[2 2] not alive
> 2017-10-11 17:28:29.931 o.a.s.d.nimbus timer [INFO] Executor
> storm-kafka-topology1-2-1507756303:[3 3] not alive
> 2017-10-11 17:28:29.931 o.a.s.d.nimbus timer [INFO] Executor
> storm-kafka-topology1-2-1507756303:[1 1] not alive
> 2017-10-11 17:28:29.932 o.a.s.d.nimbus timer [INFO] Executor
> storm-kafka-topology1-2-1507756303:[4 4] not alive
> 2017-10-11 17:28:29.932 o.a.s.d.nimbus timer [INFO] Executor
> kafkaTopology-1-1507755826:[2 2] not alive
> 2017-10-11 17:28:29.932 o.a.s.d.nimbus timer [INFO] Executor
> kafkaTopology-1-1507755826:[3 3] not alive
> 2017-10-11 17:28:29.932 o.a.s.d.nimbus timer [INFO] Executor
> kafkaTopology-1-1507755826:[1 1] not alive
> 2017-10-11 17:28:29.936 o.a.s.d.nimbus timer [INFO] Setting new assignment
> for topology id storm-kafka-topology1-2-1507756303:
> #org.apache.storm.daemon.common.Assignment{:master-code-dir
> "storm-local", :node->host {}, :executor->node+port {},
> :executor->start-time-secs {[1 1] 1507757182, [2 2] 1507757182, [3 3]
> 1507757182, [4 4] 1507757182}, :worker->resources {}, :owner "rbliton"}
> 2017-10-11 17:28:29.943 o.a.s.d.nimbus timer [INFO] Setting new assignment
> for topology id kafkaTopology-1-1507755826: #org.apache.storm.daemon.
> common.Assignment{:master-code-dir "storm-local", :node->host {},
> :executor->node+port {}, :executor->start-time-secs {[1 1] 1507757182, [2
> 2] 1507757182, [3 3] 1507757182}, :worker->resources {}, :owner "rbliton"}
>
>
> On Wed, Oct 11, 2017 at 4:56 PM, Stig Rohde Døssing <sr...@apache.org>
> wrote:
>
>> As far as I'm aware 1492 is a display bug only. It shouldn't affect how
>> your cluster works.
>>
>> If I'm understanding correctly you have Kafka, Zookeeper, Nimbus, one
>> supervisor and UI running on localhost, and there are no other machines
>> involved, right?
>>
>> Can you post your nimbus and supervisor logs?
>>
>> 2017-10-11 22:10 GMT+02:00 Ryan Bliton <ry...@gmail.com>:
>>
>>> just one quick update: fixed storm supervisor- that was something I did
>>> while messing with ports.
>>>
>>> the workers are still not being put to work however.
>>>
>>> On Wed, Oct 11, 2017 at 3:58 PM, Ryan Bliton <ry...@gmail.com>
>>> wrote:
>>>
>>>> I found this :https://issues.apache.org/jira/browse/STORM-1492
>>>>
>>>> "With the default value for nimbus.seeds (["localhost"]) Storm UI may
>>>> list one "Offline" nimbus for localhost, and another as "Leader" for the
>>>> resolved machine name.
>>>>  A workaround is to modify storm.yaml and replace "localhost" with the
>>>> hostname of the machine in nimbus.seeds."
>>>>
>>>> However, when I drop in my hostname, I am no longer able to spin up
>>>> workers! storm supervisor does nothing now.
>>>>
>>>>
>>>>
>>>> On Wed, Oct 11, 2017 at 3:42 PM, Ryan Bliton <ry...@gmail.com>
>>>> wrote:
>>>>
>>>>> Yes. Thank you for replying! I've been fussing over it some more and I
>>>>> think I'm getting closer to the issue.
>>>>>
>>>>> In fact, the logs do give a clue- my workers start in state "EMPTY
>>>>> -assignment null," do nothing, then get removed after not being used.
>>>>> The work isn't even hitting the workers.
>>>>>
>>>>> in my Storm UI, it lists my PC name (ABCD-PC123453.my.company.name)
>>>>> as the leader, and localhost as offline.
>>>>>
>>>>> So, somehow, I must have my nimbus and workers running somewhere
>>>>> completely different from the Kafka cluster, which are running on localhost.
>>>>>
>>>>> I am currently futzing with port numbers in storm.yaml.
>>>>>
>>>>> How can I bring localhost online as the leader?
>>>>>
>>>>> On Wed, Oct 11, 2017 at 2:58 PM, Stig Rohde Døssing <sr...@apache.org>
>>>>> wrote:
>>>>>
>>>>>> Hi Ryan,
>>>>>>
>>>>>> I don't see anything obviously wrong with your configuration. It's
>>>>>> likely your topology logs can tell you what's going wrong. Next time you
>>>>>> start your topology make note of the topology name in Storm UI. Also click
>>>>>> in to your spout in Storm UI and note which worker port(s) it's running on
>>>>>> (if you're running on a multi-node cluster you'll also need to note which
>>>>>> machine is running the spout). You should then be able to go to
>>>>>> $storm-install-dir/logs/workers-artifacts/$your-topology-name-here/$worker-port/worker.log
>>>>>> on the relevant worker and see what the spout worker is logging.
>>>>>>
>>>>>> In case you don't find anything interesting there, you might also
>>>>>> look at logs/nimbus.log on the machine running Nimbus and
>>>>>> logs/supervisor.log on the machine running the supervisor for those logs.
>>>>>>
>>>>>> Also just to make sure, you're running "storm supervisor" as well as
>>>>>> "storm nimbus", right? Otherwise your topology won't be assigned to a
>>>>>> worker.
>>>>>>
>>>>>> 2017-10-11 16:53 GMT+02:00 Ryan Bliton <ry...@gmail.com>:
>>>>>>
>>>>>>> Hi! I'm trying to get a starter Kafka-Storm integration going. I've
>>>>>>> got a simple topology working in Local mode- It reads the messages from a
>>>>>>> Kafka topic and sends them to a bolt that logs them. However, when I try to
>>>>>>> submit the Topology to a cluster, the Storm UI always reads 0 tuples
>>>>>>> emitted from the KafkaSpout.
>>>>>>>
>>>>>>> I've done several laps around the internet at this point, built and
>>>>>>> tried different starter projects, and each has the same issue. I can submit
>>>>>>> the Topology, but it won't actually work.
>>>>>>>
>>>>>>> Similar problems to mine seem to come from the Storm /lib and
>>>>>>> incompatible .jar files within. I haven't found anything like that in my
>>>>>>> case. However, I'm not 100% sure what I should be looking for so I can't
>>>>>>> rule it out.
>>>>>>>
>>>>>>> I don't know how to make code look pretty on a mailing list, so here
>>>>>>> is a stack overflow about my issue:
>>>>>>>
>>>>>>> https://stackoverflow.com/questions/46676377/apache-storm-ka
>>>>>>> fka-cant-see-sent-kafka-messages-in-storm-ui
>>>>>>>
>>>>>>> I make sure to call storm.supervisor before testing.
>>>>>>>
>>>>>>> I have zookeeper running off port 2181.
>>>>>>>
>>>>>>> I spin up a Kafka broker and use the topic storm-test-topic1.
>>>>>>>
>>>>>>> I fire up a console Kafka producer to send nonsense messages.
>>>>>>>
>>>>>>> Storm.yaml:
>>>>>>> ########### These MUST be filled in for a storm configuration
>>>>>>>  storm.zookeeper.servers:
>>>>>>>      - "localhost"
>>>>>>> #     - "server2"
>>>>>>> #
>>>>>>>  nimbus.seeds: ["localhost"]
>>>>>>> #
>>>>>>> #
>>>>>>>
>>>>>>> ------------------------------------------------------------
>>>>>>> ----------------------------------
>>>>>>> Topology:
>>>>>>>
>>>>>>> package com.kafka.storm;
>>>>>>>
>>>>>>> import java.util.HashMap;
>>>>>>>
>>>>>>> import org.apache.log4j.Logger;
>>>>>>> import org.apache.storm.Config;
>>>>>>> import org.apache.storm.LocalCluster;
>>>>>>> import org.apache.storm.StormSubmitter;
>>>>>>> import org.apache.storm.generated.AlreadyAliveException;
>>>>>>> import org.apache.storm.generated.AuthorizationException;
>>>>>>> import org.apache.storm.generated.InvalidTopologyException;
>>>>>>> import org.apache.storm.kafka.BrokerHosts;
>>>>>>> import org.apache.storm.kafka.KafkaSpout;
>>>>>>> import org.apache.storm.kafka.SpoutConfig;
>>>>>>> import org.apache.storm.kafka.StringScheme;
>>>>>>> import org.apache.storm.kafka.ZkHosts;
>>>>>>> import org.apache.storm.spout.SchemeAsMultiScheme;
>>>>>>> import org.apache.storm.topology.TopologyBuilder;
>>>>>>>
>>>>>>> import com.kafka.storm.bolt.LoggerBolt;
>>>>>>>
>>>>>>> public class KafkaStormIntegrationDemo {
>>>>>>> private static final Logger LOG = Logger.getLogger(KafkaStormInt
>>>>>>> egrationDemo.class);
>>>>>>>
>>>>>>> public static void main(String[] args) throws
>>>>>>> InvalidTopologyException, AuthorizationException, AlreadyAliveException {
>>>>>>>
>>>>>>> // Build Spout configuration using input command line parameters
>>>>>>> final BrokerHosts zkrHosts = new ZkHosts("localhost:2181");
>>>>>>> final String kafkaTopic = "storm-test-topic1";
>>>>>>> final String zkRoot = "";
>>>>>>> final String clientId = "storm-consumer";
>>>>>>> SpoutConfig kafkaConf = new SpoutConfig(zkrHosts, kafkaTopic,
>>>>>>> zkRoot, clientId);
>>>>>>> kafkaConf.startOffsetTime = -2;
>>>>>>> kafkaConf.scheme = new SchemeAsMultiScheme(new StringScheme());
>>>>>>>
>>>>>>> // Build topology to consume message from kafka and print them on
>>>>>>> console
>>>>>>> final TopologyBuilder topologyBuilder = new TopologyBuilder();
>>>>>>> // Create KafkaSpout instance using Kafka configuration and add it
>>>>>>> to topology
>>>>>>> topologyBuilder.setSpout("kafka-spout", new KafkaSpout(kafkaConf),
>>>>>>> 1);
>>>>>>> //Route the output of Kafka Spout to Logger bolt to log messages
>>>>>>> consumed from Kafka
>>>>>>> topologyBuilder.setBolt("print-messages", new
>>>>>>> LoggerBolt()).globalGrouping("kafka-spout");
>>>>>>> // Submit topology to local cluster i.e. embedded storm instance in
>>>>>>> eclipse
>>>>>>> Config conf = new Config();
>>>>>>> System.setProperty("storm.jar","C://apache-storm-1.1.1/lib/s
>>>>>>> torm-core-1.1.1.jar");
>>>>>>> StormSubmitter.submitTopology("kafkaTopology", conf,
>>>>>>> topologyBuilder.createTopology());
>>>>>>> }
>>>>>>> }
>>>>>>> ------------------------------------------------------------
>>>>>>> ----------------------------------
>>>>>>>
>>>>>>> Bolt:
>>>>>>>
>>>>>>> package com.kafka.storm.bolt;
>>>>>>>
>>>>>>> import org.apache.log4j.Logger;
>>>>>>> import org.apache.storm.topology.BasicOutputCollector;
>>>>>>> import org.apache.storm.topology.OutputFieldsDeclarer;
>>>>>>> import org.apache.storm.topology.base.BaseBasicBolt;
>>>>>>> import org.apache.storm.tuple.Fields;
>>>>>>> import org.apache.storm.tuple.Tuple;
>>>>>>>
>>>>>>> public class LoggerBolt extends BaseBasicBolt{
>>>>>>> private static final long serialVersionUID = 1L;
>>>>>>> private static final Logger LOG = Logger.getLogger(LoggerBolt.cl
>>>>>>> ass);
>>>>>>>
>>>>>>> public void execute(Tuple input, BasicOutputCollector collector) {
>>>>>>> LOG.info(input.getString(0));
>>>>>>> }
>>>>>>>
>>>>>>> public void declareOutputFields(OutputFieldsDeclarer declarer) {
>>>>>>> declarer.declare(new Fields("message"));
>>>>>>> }
>>>>>>> }
>>>>>>>
>>>>>>>
>>>>>>> thank you in advance for any help you can give, or for just reading!
>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Re: New to Storm- KafkaSpout won't emit Tuples in Cluster mode

Posted by Ryan Bliton <ry...@gmail.com>.
you are right, fixing the display bug did not affect the cluster at all.
Thank you again for taking the time. I'm just about ready to give up, I've
been working on this all week.

I have Kafka, Zookeeper, Nimbus, supervisor, and UI all running in
localhost, I am not connecting to any other machines.

I called Supervisor again- the UI registers that there are executors for a
minute and then they go away having done nothing. Here are the logs of
Nimbus and Supervisor around the time I called storm supervisor.

In the logs I swapped my actual PC's name with [MY PC NAME] before posting.

Supervisor log:

2017-10-11 17:26:20.552 o.a.s.z.Zookeeper main [INFO] Staring ZK Curator
2017-10-11 17:26:20.563 o.a.s.s.o.a.c.f.i.CuratorFrameworkImpl main [INFO]
Starting
2017-10-11 17:26:20.579 o.a.s.s.o.a.z.ZooKeeper main [INFO] Client
environment:zookeeper.version=3.4.6-1569965, built on 02/20/2014 09:09 GMT
2017-10-11 17:26:20.580 o.a.s.s.o.a.z.ZooKeeper main [INFO] Client
environment:host.name=[MY PC NAME]
2017-10-11 17:26:20.580 o.a.s.s.o.a.z.ZooKeeper main [INFO] Client
environment:java.version=1.8.0_131
2017-10-11 17:26:20.581 o.a.s.s.o.a.z.ZooKeeper main [INFO] Client
environment:java.vendor=Oracle Corporation
2017-10-11 17:26:20.582 o.a.s.s.o.a.z.ZooKeeper main [INFO] Client
environment:java.home=C:\JAVA\jdk1.8.0_131\jre
2017-10-11 17:26:20.582 o.a.s.s.o.a.z.ZooKeeper main [INFO] Client
environment:java.class.path=C:\apache-storm-1.1.1\*;C:\apache-storm-1.1.1\conf;C:\JAVA\jdk1.8.0_131\lib\tools.jar;C:\apache-storm-1.1.1\lib\asm-5.0.3.jar;C:\apache-storm-1.1.1\lib\clojure-1.7.0.jar;C:\apache-storm-1.1.1\lib\disruptor-3.3.2.jar;C:\apache-storm-1.1.1\lib\kryo-3.0.3.jar;C:\apache-storm-1.1.1\lib\log4j-api-2.8.2.jar;C:\apache-storm-1.1.1\lib\log4j-core-2.8.2.jar;C:\apache-storm-1.1.1\lib\log4j-over-slf4j-1.6.6.jar;C:\apache-storm-1.1.1\lib\log4j-slf4j-impl-2.8.2.jar;C:\apache-storm-1.1.1\lib\minlog-1.3.0.jar;C:\apache-storm-1.1.1\lib\objenesis-2.1.jar;C:\apache-storm-1.1.1\lib\reflectasm-1.10.1.jar;C:\apache-storm-1.1.1\lib\ring-cors-0.1.5.jar;C:\apache-storm-1.1.1\lib\servlet-api-2.5.jar;C:\apache-storm-1.1.1\lib\slf4j-api-1.7.21.jar;C:\apache-storm-1.1.1\lib\storm-core-1.1.1.jar;C:\apache-storm-1.1.1\lib\storm-rename-hack-1.1.1.jar
2017-10-11 17:26:20.582 o.a.s.s.o.a.z.ZooKeeper main [INFO] Client
environment:java.library.path=/usr/local/lib:/opt/local/lib:/usr/lib;C:\JAVA\jdk1.8.0_131\bin;C:\JAVA\jdk1.8.0_131\lib;C:\JAVA\jdk1.8.0_131\jre\bin;C:\JAVA\jdk1.8.0_131\jre\lib
2017-10-11 17:26:20.582 o.a.s.s.o.a.z.ZooKeeper main [INFO] Client
environment:java.io.tmpdir=C:\Users\rbliton\AppData\Local\Temp\
2017-10-11 17:26:20.582 o.a.s.s.o.a.z.ZooKeeper main [INFO] Client
environment:java.compiler=<NA>
2017-10-11 17:26:20.582 o.a.s.s.o.a.z.ZooKeeper main [INFO] Client
environment:os.name=Windows 10
2017-10-11 17:26:20.583 o.a.s.s.o.a.z.ZooKeeper main [INFO] Client
environment:os.arch=amd64
2017-10-11 17:26:20.583 o.a.s.s.o.a.z.ZooKeeper main [INFO] Client
environment:os.version=10.0
2017-10-11 17:26:20.583 o.a.s.s.o.a.z.ZooKeeper main [INFO] Client
environment:user.name=rbliton
2017-10-11 17:26:20.583 o.a.s.s.o.a.z.ZooKeeper main [INFO] Client
environment:user.home=C:\Users\rbliton
2017-10-11 17:26:20.583 o.a.s.s.o.a.z.ZooKeeper main [INFO] Client
environment:user.dir=C:\apache-storm-1.1.1\bin
2017-10-11 17:26:20.585 o.a.s.s.o.a.z.ZooKeeper main [INFO] Initiating
client connection, connectString=localhost:2181 sessionTimeout=20000
watcher=org.apache.storm.shade.org.apache.curator.ConnectionState@2216effc
2017-10-11 17:26:20.801 o.a.s.s.o.a.z.ClientCnxn main-SendThread(
127.0.0.1:2181) [INFO] Opening socket connection to server
127.0.0.1/127.0.0.1:2181. Will not attempt to authenticate using SASL
(unknown error)
2017-10-11 17:26:20.803 o.a.s.s.o.a.z.ClientCnxn main-SendThread(
127.0.0.1:2181) [INFO] Socket connection established to
127.0.0.1/127.0.0.1:2181, initiating session
2017-10-11 17:26:20.826 o.a.s.s.o.a.z.ClientCnxn main-SendThread(
127.0.0.1:2181) [INFO] Session establishment complete on server
127.0.0.1/127.0.0.1:2181, sessionid = 0x15f0d3b46a70016, negotiated timeout
= 20000
2017-10-11 17:26:20.845 o.a.s.s.o.a.c.f.s.ConnectionStateManager
main-EventThread [INFO] State change: CONNECTED
2017-10-11 17:26:20.856 o.a.s.s.o.a.c.f.i.CuratorFrameworkImpl
Curator-Framework-0 [INFO] backgroundOperationsLoop exiting
2017-10-11 17:26:20.867 o.a.s.s.o.a.z.ZooKeeper main [INFO] Session:
0x15f0d3b46a70016 closed
2017-10-11 17:26:20.869 o.a.s.z.Zookeeper main [INFO] Staring ZK Curator
2017-10-11 17:26:20.870 o.a.s.s.o.a.c.f.i.CuratorFrameworkImpl main [INFO]
Starting
2017-10-11 17:26:20.869 o.a.s.s.o.a.z.ClientCnxn main-EventThread [INFO]
EventThread shut down
2017-10-11 17:26:20.870 o.a.s.s.o.a.z.ZooKeeper main [INFO] Initiating
client connection, connectString=localhost:2181/storm sessionTimeout=20000
watcher=org.apache.storm.shade.org.apache.curator.ConnectionState@4417af13
2017-10-11 17:26:20.880 o.a.s.s.o.a.z.ClientCnxn
main-SendThread(0:0:0:0:0:0:0:1:2181) [INFO] Opening socket connection to
server 0:0:0:0:0:0:0:1/0:0:0:0:0:0:0:1:2181. Will not attempt to
authenticate using SASL (unknown error)
2017-10-11 17:26:20.881 o.a.s.s.o.a.z.ClientCnxn
main-SendThread(0:0:0:0:0:0:0:1:2181) [INFO] Socket connection established
to 0:0:0:0:0:0:0:1/0:0:0:0:0:0:0:1:2181, initiating session
2017-10-11 17:26:20.932 o.a.s.s.o.a.z.ClientCnxn
main-SendThread(0:0:0:0:0:0:0:1:2181) [INFO] Session establishment complete
on server 0:0:0:0:0:0:0:1/0:0:0:0:0:0:0:1:2181, sessionid =
0x15f0d3b46a70017, negotiated timeout = 20000
2017-10-11 17:26:20.933 o.a.s.s.o.a.c.f.s.ConnectionStateManager
main-EventThread [INFO] State change: CONNECTED
2017-10-11 17:26:20.952 o.a.s.l.Localizer main [INFO] Reconstruct localized
resource: C:\apache-storm-1.1.1\storm-local\supervisor\usercache
2017-10-11 17:26:20.954 o.a.s.l.Localizer main [WARN] No left over
resources found for any user during reconstructing of local resources at:
C:\apache-storm-1.1.1\storm-local\supervisor\usercache
2017-10-11 17:26:20.962 o.a.s.d.s.Supervisor main [INFO] Starting
supervisor for storm version '1.1.1'.
2017-10-11 17:26:21.005 o.a.s.d.s.Slot main [WARN] SLOT [MY PC NAME]:6700
Starting in state EMPTY - assignment null
2017-10-11 17:26:21.006 o.a.s.d.s.Slot main [WARN] SLOT [MY PC NAME]:6701
Starting in state EMPTY - assignment null
2017-10-11 17:26:21.006 o.a.s.d.s.Slot main [WARN] SLOT [MY PC NAME]:6701
Starting in state EMPTY - assignment null
2017-10-11 17:26:21.009 o.a.s.d.s.Slot main [WARN] SLOT [MY PC NAME]:6703
Starting in state EMPTY - assignment null
2017-10-11 17:26:21.020 o.a.s.d.s.Container main [INFO] GET worker-user for
964b34c1-d7e3-4f22-9ee2-a0664c10294e
2017-10-11 17:26:21.022 o.a.s.d.s.Container main [INFO] Cleaning up
5ef774ff-f24c-4810-9330-5e1291bd7539:964b34c1-d7e3-4f22-9ee2-a0664c10294e
2017-10-11 17:26:21.023 o.a.s.d.s.Container main [INFO] GET worker-user for
964b34c1-d7e3-4f22-9ee2-a0664c10294e
2017-10-11 17:26:21.024 o.a.s.d.s.AdvancedFSOps main [INFO] Deleting path
C:\apache-storm-1.1.1\storm-local\workers\964b34c1-d7e3-4f22-9ee2-a0664c10294e\heartbeats
2017-10-11 17:26:21.029 o.a.s.d.s.AdvancedFSOps main [INFO] Deleting path
C:\apache-storm-1.1.1\storm-local\workers\964b34c1-d7e3-4f22-9ee2-a0664c10294e\pids
2017-10-11 17:26:21.032 o.a.s.d.s.AdvancedFSOps main [INFO] Deleting path
C:\apache-storm-1.1.1\storm-local\workers\964b34c1-d7e3-4f22-9ee2-a0664c10294e\tmp
2017-10-11 17:26:21.034 o.a.s.d.s.AdvancedFSOps main [INFO] Deleting path
C:\apache-storm-1.1.1\storm-local\workers\964b34c1-d7e3-4f22-9ee2-a0664c10294e

Nimbus logs:

2017-10-11 17:26:22.189 o.a.s.d.nimbus timer [INFO] Executor
storm-kafka-topology1-2-1507756303:[2 2] not alive
2017-10-11 17:26:22.189 o.a.s.d.nimbus timer [INFO] Executor
storm-kafka-topology1-2-1507756303:[3 3] not alive
2017-10-11 17:26:22.189 o.a.s.d.nimbus timer [INFO] Executor
storm-kafka-topology1-2-1507756303:[1 1] not alive
2017-10-11 17:26:22.189 o.a.s.d.nimbus timer [INFO] Executor
storm-kafka-topology1-2-1507756303:[4 4] not alive
2017-10-11 17:26:22.190 o.a.s.d.nimbus timer [INFO] Executor
kafkaTopology-1-1507755826:[2 2] not alive
2017-10-11 17:26:22.190 o.a.s.d.nimbus timer [INFO] Executor
kafkaTopology-1-1507755826:[3 3] not alive
2017-10-11 17:26:22.190 o.a.s.d.nimbus timer [INFO] Executor
kafkaTopology-1-1507755826:[1 1] not alive
2017-10-11 17:26:22.192 o.a.s.s.EvenScheduler timer [INFO] Available slots:
(["5ef774ff-f24c-4810-9330-5e1291bd7539" 6700]
["5ef774ff-f24c-4810-9330-5e1291bd7539" 6701]
["5ef774ff-f24c-4810-9330-5e1291bd7539" 6702]
["5ef774ff-f24c-4810-9330-5e1291bd7539" 6703])
2017-10-11 17:26:22.192 o.a.s.s.EvenScheduler timer [INFO] Available slots:
(["5ef774ff-f24c-4810-9330-5e1291bd7539" 6701]
["5ef774ff-f24c-4810-9330-5e1291bd7539" 6702]
["5ef774ff-f24c-4810-9330-5e1291bd7539" 6703])
2017-10-11 17:26:22.195 o.a.s.d.nimbus timer [INFO] Setting new assignment
for topology id storm-kafka-topology1-2-1507756303:
#org.apache.storm.daemon.common.Assignment{:master-code-dir "storm-local",
:node->host {"5ef774ff-f24c-4810-9330-5e1291bd7539" "
LNAR-PC0611K6.corp.capgemini.com"}, :executor->node+port {[4 4]
["5ef774ff-f24c-4810-9330-5e1291bd7539" 6701], [3 3]
["5ef774ff-f24c-4810-9330-5e1291bd7539" 6701], [2 2]
["5ef774ff-f24c-4810-9330-5e1291bd7539" 6701], [1 1]
["5ef774ff-f24c-4810-9330-5e1291bd7539" 6701]}, :executor->start-time-secs
{[1 1] 1507757182, [2 2] 1507757182, [3 3] 1507757182, [4 4] 1507757182},
:worker->resources {["5ef774ff-f24c-4810-9330-5e1291bd7539" 6701] [0.0 0.0
0.0]}, :owner "rbliton"}
2017-10-11 17:26:22.201 o.a.s.d.nimbus timer [INFO] Setting new assignment
for topology id kafkaTopology-1-1507755826:
#org.apache.storm.daemon.common.Assignment{:master-code-dir "storm-local",
:node->host {"5ef774ff-f24c-4810-9330-5e1291bd7539" "
LNAR-PC0611K6.corp.capgemini.com"}, :executor->node+port {[2 2]
["5ef774ff-f24c-4810-9330-5e1291bd7539" 6700], [1 1]
["5ef774ff-f24c-4810-9330-5e1291bd7539" 6700], [3 3]
["5ef774ff-f24c-4810-9330-5e1291bd7539" 6700]}, :executor->start-time-secs
{[1 1] 1507757182, [2 2] 1507757182, [3 3] 1507757182}, :worker->resources
{["5ef774ff-f24c-4810-9330-5e1291bd7539" 6700] [0.0 0.0 0.0]}, :owner
"rbliton"}
2017-10-11 17:26:23.209 o.a.s.d.nimbus pool-14-thread-25 [INFO] Created
download session for kafkaTopology-1-1507755826-stormjar.jar with id
bed36ea8-1d05-4d80-8006-c2bfb40ed8f6
2017-10-11 17:26:23.801 o.a.s.d.nimbus pool-14-thread-7 [INFO] Created
download session for kafkaTopology-1-1507755826-stormcode.ser with id
7227688a-4381-48d5-ac27-c6a6e03f5a5d
2017-10-11 17:26:23.837 o.a.s.d.nimbus pool-14-thread-11 [INFO] Created
download session for kafkaTopology-1-1507755826-stormconf.ser with id
d4d6712b-55ec-46fe-8e9c-6df068d65133
2017-10-11 17:26:23.913 o.a.s.d.nimbus pool-14-thread-20 [INFO] Created
download session for storm-kafka-topology1-2-1507756303-stormjar.jar with
id 3b7263a0-fd39-4caf-b90b-3deaf3999308
2017-10-11 17:26:24.496 o.a.s.d.nimbus pool-14-thread-64 [INFO] Created
download session for storm-kafka-topology1-2-1507756303-stormcode.ser with
id cd88098a-22c7-44c3-b709-34a202bc4952
2017-10-11 17:26:24.529 o.a.s.d.nimbus pool-14-thread-63 [INFO] Created
download session for storm-kafka-topology1-2-1507756303-stormconf.ser with
id e7478249-7082-4edb-93bb-3746f39db06e
2017-10-11 17:28:29.931 o.a.s.d.nimbus timer [INFO] Executor
storm-kafka-topology1-2-1507756303:[2 2] not alive
2017-10-11 17:28:29.931 o.a.s.d.nimbus timer [INFO] Executor
storm-kafka-topology1-2-1507756303:[3 3] not alive
2017-10-11 17:28:29.931 o.a.s.d.nimbus timer [INFO] Executor
storm-kafka-topology1-2-1507756303:[1 1] not alive
2017-10-11 17:28:29.932 o.a.s.d.nimbus timer [INFO] Executor
storm-kafka-topology1-2-1507756303:[4 4] not alive
2017-10-11 17:28:29.932 o.a.s.d.nimbus timer [INFO] Executor
kafkaTopology-1-1507755826:[2 2] not alive
2017-10-11 17:28:29.932 o.a.s.d.nimbus timer [INFO] Executor
kafkaTopology-1-1507755826:[3 3] not alive
2017-10-11 17:28:29.932 o.a.s.d.nimbus timer [INFO] Executor
kafkaTopology-1-1507755826:[1 1] not alive
2017-10-11 17:28:29.936 o.a.s.d.nimbus timer [INFO] Setting new assignment
for topology id storm-kafka-topology1-2-1507756303:
#org.apache.storm.daemon.common.Assignment{:master-code-dir "storm-local",
:node->host {}, :executor->node+port {}, :executor->start-time-secs {[1 1]
1507757182, [2 2] 1507757182, [3 3] 1507757182, [4 4] 1507757182},
:worker->resources {}, :owner "rbliton"}
2017-10-11 17:28:29.943 o.a.s.d.nimbus timer [INFO] Setting new assignment
for topology id kafkaTopology-1-1507755826:
#org.apache.storm.daemon.common.Assignment{:master-code-dir "storm-local",
:node->host {}, :executor->node+port {}, :executor->start-time-secs {[1 1]
1507757182, [2 2] 1507757182, [3 3] 1507757182}, :worker->resources {},
:owner "rbliton"}


On Wed, Oct 11, 2017 at 4:56 PM, Stig Rohde Døssing <sr...@apache.org> wrote:

> As far as I'm aware 1492 is a display bug only. It shouldn't affect how
> your cluster works.
>
> If I'm understanding correctly you have Kafka, Zookeeper, Nimbus, one
> supervisor and UI running on localhost, and there are no other machines
> involved, right?
>
> Can you post your nimbus and supervisor logs?
>
> 2017-10-11 22:10 GMT+02:00 Ryan Bliton <ry...@gmail.com>:
>
>> just one quick update: fixed storm supervisor- that was something I did
>> while messing with ports.
>>
>> the workers are still not being put to work however.
>>
>> On Wed, Oct 11, 2017 at 3:58 PM, Ryan Bliton <ry...@gmail.com>
>> wrote:
>>
>>> I found this :https://issues.apache.org/jira/browse/STORM-1492
>>>
>>> "With the default value for nimbus.seeds (["localhost"]) Storm UI may
>>> list one "Offline" nimbus for localhost, and another as "Leader" for the
>>> resolved machine name.
>>>  A workaround is to modify storm.yaml and replace "localhost" with the
>>> hostname of the machine in nimbus.seeds."
>>>
>>> However, when I drop in my hostname, I am no longer able to spin up
>>> workers! storm supervisor does nothing now.
>>>
>>>
>>>
>>> On Wed, Oct 11, 2017 at 3:42 PM, Ryan Bliton <ry...@gmail.com>
>>> wrote:
>>>
>>>> Yes. Thank you for replying! I've been fussing over it some more and I
>>>> think I'm getting closer to the issue.
>>>>
>>>> In fact, the logs do give a clue- my workers start in state "EMPTY
>>>> -assignment null," do nothing, then get removed after not being used.
>>>> The work isn't even hitting the workers.
>>>>
>>>> in my Storm UI, it lists my PC name (ABCD-PC123453.my.company.name) as
>>>> the leader, and localhost as offline.
>>>>
>>>> So, somehow, I must have my nimbus and workers running somewhere
>>>> completely different from the Kafka cluster, which are running on localhost.
>>>>
>>>> I am currently futzing with port numbers in storm.yaml.
>>>>
>>>> How can I bring localhost online as the leader?
>>>>
>>>> On Wed, Oct 11, 2017 at 2:58 PM, Stig Rohde Døssing <sr...@apache.org>
>>>> wrote:
>>>>
>>>>> Hi Ryan,
>>>>>
>>>>> I don't see anything obviously wrong with your configuration. It's
>>>>> likely your topology logs can tell you what's going wrong. Next time you
>>>>> start your topology make note of the topology name in Storm UI. Also click
>>>>> in to your spout in Storm UI and note which worker port(s) it's running on
>>>>> (if you're running on a multi-node cluster you'll also need to note which
>>>>> machine is running the spout). You should then be able to go to
>>>>> $storm-install-dir/logs/workers-artifacts/$your-topology-name-here/$worker-port/worker.log
>>>>> on the relevant worker and see what the spout worker is logging.
>>>>>
>>>>> In case you don't find anything interesting there, you might also look
>>>>> at logs/nimbus.log on the machine running Nimbus and logs/supervisor.log on
>>>>> the machine running the supervisor for those logs.
>>>>>
>>>>> Also just to make sure, you're running "storm supervisor" as well as
>>>>> "storm nimbus", right? Otherwise your topology won't be assigned to a
>>>>> worker.
>>>>>
>>>>> 2017-10-11 16:53 GMT+02:00 Ryan Bliton <ry...@gmail.com>:
>>>>>
>>>>>> Hi! I'm trying to get a starter Kafka-Storm integration going. I've
>>>>>> got a simple topology working in Local mode- It reads the messages from a
>>>>>> Kafka topic and sends them to a bolt that logs them. However, when I try to
>>>>>> submit the Topology to a cluster, the Storm UI always reads 0 tuples
>>>>>> emitted from the KafkaSpout.
>>>>>>
>>>>>> I've done several laps around the internet at this point, built and
>>>>>> tried different starter projects, and each has the same issue. I can submit
>>>>>> the Topology, but it won't actually work.
>>>>>>
>>>>>> Similar problems to mine seem to come from the Storm /lib and
>>>>>> incompatible .jar files within. I haven't found anything like that in my
>>>>>> case. However, I'm not 100% sure what I should be looking for so I can't
>>>>>> rule it out.
>>>>>>
>>>>>> I don't know how to make code look pretty on a mailing list, so here
>>>>>> is a stack overflow about my issue:
>>>>>>
>>>>>> https://stackoverflow.com/questions/46676377/apache-storm-ka
>>>>>> fka-cant-see-sent-kafka-messages-in-storm-ui
>>>>>>
>>>>>> I make sure to call storm.supervisor before testing.
>>>>>>
>>>>>> I have zookeeper running off port 2181.
>>>>>>
>>>>>> I spin up a Kafka broker and use the topic storm-test-topic1.
>>>>>>
>>>>>> I fire up a console Kafka producer to send nonsense messages.
>>>>>>
>>>>>> Storm.yaml:
>>>>>> ########### These MUST be filled in for a storm configuration
>>>>>>  storm.zookeeper.servers:
>>>>>>      - "localhost"
>>>>>> #     - "server2"
>>>>>> #
>>>>>>  nimbus.seeds: ["localhost"]
>>>>>> #
>>>>>> #
>>>>>>
>>>>>> ------------------------------------------------------------
>>>>>> ----------------------------------
>>>>>> Topology:
>>>>>>
>>>>>> package com.kafka.storm;
>>>>>>
>>>>>> import java.util.HashMap;
>>>>>>
>>>>>> import org.apache.log4j.Logger;
>>>>>> import org.apache.storm.Config;
>>>>>> import org.apache.storm.LocalCluster;
>>>>>> import org.apache.storm.StormSubmitter;
>>>>>> import org.apache.storm.generated.AlreadyAliveException;
>>>>>> import org.apache.storm.generated.AuthorizationException;
>>>>>> import org.apache.storm.generated.InvalidTopologyException;
>>>>>> import org.apache.storm.kafka.BrokerHosts;
>>>>>> import org.apache.storm.kafka.KafkaSpout;
>>>>>> import org.apache.storm.kafka.SpoutConfig;
>>>>>> import org.apache.storm.kafka.StringScheme;
>>>>>> import org.apache.storm.kafka.ZkHosts;
>>>>>> import org.apache.storm.spout.SchemeAsMultiScheme;
>>>>>> import org.apache.storm.topology.TopologyBuilder;
>>>>>>
>>>>>> import com.kafka.storm.bolt.LoggerBolt;
>>>>>>
>>>>>> public class KafkaStormIntegrationDemo {
>>>>>> private static final Logger LOG = Logger.getLogger(KafkaStormInt
>>>>>> egrationDemo.class);
>>>>>>
>>>>>> public static void main(String[] args) throws
>>>>>> InvalidTopologyException, AuthorizationException, AlreadyAliveException {
>>>>>>
>>>>>> // Build Spout configuration using input command line parameters
>>>>>> final BrokerHosts zkrHosts = new ZkHosts("localhost:2181");
>>>>>> final String kafkaTopic = "storm-test-topic1";
>>>>>> final String zkRoot = "";
>>>>>> final String clientId = "storm-consumer";
>>>>>> SpoutConfig kafkaConf = new SpoutConfig(zkrHosts, kafkaTopic, zkRoot,
>>>>>> clientId);
>>>>>> kafkaConf.startOffsetTime = -2;
>>>>>> kafkaConf.scheme = new SchemeAsMultiScheme(new StringScheme());
>>>>>>
>>>>>> // Build topology to consume message from kafka and print them on
>>>>>> console
>>>>>> final TopologyBuilder topologyBuilder = new TopologyBuilder();
>>>>>> // Create KafkaSpout instance using Kafka configuration and add it to
>>>>>> topology
>>>>>> topologyBuilder.setSpout("kafka-spout", new KafkaSpout(kafkaConf),
>>>>>> 1);
>>>>>> //Route the output of Kafka Spout to Logger bolt to log messages
>>>>>> consumed from Kafka
>>>>>> topologyBuilder.setBolt("print-messages", new
>>>>>> LoggerBolt()).globalGrouping("kafka-spout");
>>>>>> // Submit topology to local cluster i.e. embedded storm instance in
>>>>>> eclipse
>>>>>> Config conf = new Config();
>>>>>> System.setProperty("storm.jar","C://apache-storm-1.1.1/lib/s
>>>>>> torm-core-1.1.1.jar");
>>>>>> StormSubmitter.submitTopology("kafkaTopology", conf,
>>>>>> topologyBuilder.createTopology());
>>>>>> }
>>>>>> }
>>>>>> ------------------------------------------------------------
>>>>>> ----------------------------------
>>>>>>
>>>>>> Bolt:
>>>>>>
>>>>>> package com.kafka.storm.bolt;
>>>>>>
>>>>>> import org.apache.log4j.Logger;
>>>>>> import org.apache.storm.topology.BasicOutputCollector;
>>>>>> import org.apache.storm.topology.OutputFieldsDeclarer;
>>>>>> import org.apache.storm.topology.base.BaseBasicBolt;
>>>>>> import org.apache.storm.tuple.Fields;
>>>>>> import org.apache.storm.tuple.Tuple;
>>>>>>
>>>>>> public class LoggerBolt extends BaseBasicBolt{
>>>>>> private static final long serialVersionUID = 1L;
>>>>>> private static final Logger LOG = Logger.getLogger(LoggerBolt.class);
>>>>>>
>>>>>> public void execute(Tuple input, BasicOutputCollector collector) {
>>>>>> LOG.info(input.getString(0));
>>>>>> }
>>>>>>
>>>>>> public void declareOutputFields(OutputFieldsDeclarer declarer) {
>>>>>> declarer.declare(new Fields("message"));
>>>>>> }
>>>>>> }
>>>>>>
>>>>>>
>>>>>> thank you in advance for any help you can give, or for just reading!
>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Re: New to Storm- KafkaSpout won't emit Tuples in Cluster mode

Posted by Stig Rohde Døssing <sr...@apache.org>.
As far as I'm aware 1492 is a display bug only. It shouldn't affect how
your cluster works.

If I'm understanding correctly you have Kafka, Zookeeper, Nimbus, one
supervisor and UI running on localhost, and there are no other machines
involved, right?

Can you post your nimbus and supervisor logs?

2017-10-11 22:10 GMT+02:00 Ryan Bliton <ry...@gmail.com>:

> just one quick update: fixed storm supervisor- that was something I did
> while messing with ports.
>
> the workers are still not being put to work however.
>
> On Wed, Oct 11, 2017 at 3:58 PM, Ryan Bliton <ry...@gmail.com>
> wrote:
>
>> I found this :https://issues.apache.org/jira/browse/STORM-1492
>>
>> "With the default value for nimbus.seeds (["localhost"]) Storm UI may
>> list one "Offline" nimbus for localhost, and another as "Leader" for the
>> resolved machine name.
>>  A workaround is to modify storm.yaml and replace "localhost" with the
>> hostname of the machine in nimbus.seeds."
>>
>> However, when I drop in my hostname, I am no longer able to spin up
>> workers! storm supervisor does nothing now.
>>
>>
>>
>> On Wed, Oct 11, 2017 at 3:42 PM, Ryan Bliton <ry...@gmail.com>
>> wrote:
>>
>>> Yes. Thank you for replying! I've been fussing over it some more and I
>>> think I'm getting closer to the issue.
>>>
>>> In fact, the logs do give a clue- my workers start in state "EMPTY
>>> -assignment null," do nothing, then get removed after not being used.
>>> The work isn't even hitting the workers.
>>>
>>> in my Storm UI, it lists my PC name (ABCD-PC123453.my.company.name) as
>>> the leader, and localhost as offline.
>>>
>>> So, somehow, I must have my nimbus and workers running somewhere
>>> completely different from the Kafka cluster, which are running on localhost.
>>>
>>> I am currently futzing with port numbers in storm.yaml.
>>>
>>> How can I bring localhost online as the leader?
>>>
>>> On Wed, Oct 11, 2017 at 2:58 PM, Stig Rohde Døssing <sr...@apache.org>
>>> wrote:
>>>
>>>> Hi Ryan,
>>>>
>>>> I don't see anything obviously wrong with your configuration. It's
>>>> likely your topology logs can tell you what's going wrong. Next time you
>>>> start your topology make note of the topology name in Storm UI. Also click
>>>> in to your spout in Storm UI and note which worker port(s) it's running on
>>>> (if you're running on a multi-node cluster you'll also need to note which
>>>> machine is running the spout). You should then be able to go to
>>>> $storm-install-dir/logs/workers-artifacts/$your-topology-name-here/$worker-port/worker.log
>>>> on the relevant worker and see what the spout worker is logging.
>>>>
>>>> In case you don't find anything interesting there, you might also look
>>>> at logs/nimbus.log on the machine running Nimbus and logs/supervisor.log on
>>>> the machine running the supervisor for those logs.
>>>>
>>>> Also just to make sure, you're running "storm supervisor" as well as
>>>> "storm nimbus", right? Otherwise your topology won't be assigned to a
>>>> worker.
>>>>
>>>> 2017-10-11 16:53 GMT+02:00 Ryan Bliton <ry...@gmail.com>:
>>>>
>>>>> Hi! I'm trying to get a starter Kafka-Storm integration going. I've
>>>>> got a simple topology working in Local mode- It reads the messages from a
>>>>> Kafka topic and sends them to a bolt that logs them. However, when I try to
>>>>> submit the Topology to a cluster, the Storm UI always reads 0 tuples
>>>>> emitted from the KafkaSpout.
>>>>>
>>>>> I've done several laps around the internet at this point, built and
>>>>> tried different starter projects, and each has the same issue. I can submit
>>>>> the Topology, but it won't actually work.
>>>>>
>>>>> Similar problems to mine seem to come from the Storm /lib and
>>>>> incompatible .jar files within. I haven't found anything like that in my
>>>>> case. However, I'm not 100% sure what I should be looking for so I can't
>>>>> rule it out.
>>>>>
>>>>> I don't know how to make code look pretty on a mailing list, so here
>>>>> is a stack overflow about my issue:
>>>>>
>>>>> https://stackoverflow.com/questions/46676377/apache-storm-ka
>>>>> fka-cant-see-sent-kafka-messages-in-storm-ui
>>>>>
>>>>> I make sure to call storm.supervisor before testing.
>>>>>
>>>>> I have zookeeper running off port 2181.
>>>>>
>>>>> I spin up a Kafka broker and use the topic storm-test-topic1.
>>>>>
>>>>> I fire up a console Kafka producer to send nonsense messages.
>>>>>
>>>>> Storm.yaml:
>>>>> ########### These MUST be filled in for a storm configuration
>>>>>  storm.zookeeper.servers:
>>>>>      - "localhost"
>>>>> #     - "server2"
>>>>> #
>>>>>  nimbus.seeds: ["localhost"]
>>>>> #
>>>>> #
>>>>>
>>>>> ------------------------------------------------------------
>>>>> ----------------------------------
>>>>> Topology:
>>>>>
>>>>> package com.kafka.storm;
>>>>>
>>>>> import java.util.HashMap;
>>>>>
>>>>> import org.apache.log4j.Logger;
>>>>> import org.apache.storm.Config;
>>>>> import org.apache.storm.LocalCluster;
>>>>> import org.apache.storm.StormSubmitter;
>>>>> import org.apache.storm.generated.AlreadyAliveException;
>>>>> import org.apache.storm.generated.AuthorizationException;
>>>>> import org.apache.storm.generated.InvalidTopologyException;
>>>>> import org.apache.storm.kafka.BrokerHosts;
>>>>> import org.apache.storm.kafka.KafkaSpout;
>>>>> import org.apache.storm.kafka.SpoutConfig;
>>>>> import org.apache.storm.kafka.StringScheme;
>>>>> import org.apache.storm.kafka.ZkHosts;
>>>>> import org.apache.storm.spout.SchemeAsMultiScheme;
>>>>> import org.apache.storm.topology.TopologyBuilder;
>>>>>
>>>>> import com.kafka.storm.bolt.LoggerBolt;
>>>>>
>>>>> public class KafkaStormIntegrationDemo {
>>>>> private static final Logger LOG = Logger.getLogger(KafkaStormInt
>>>>> egrationDemo.class);
>>>>>
>>>>> public static void main(String[] args) throws
>>>>> InvalidTopologyException, AuthorizationException, AlreadyAliveException {
>>>>>
>>>>> // Build Spout configuration using input command line parameters
>>>>> final BrokerHosts zkrHosts = new ZkHosts("localhost:2181");
>>>>> final String kafkaTopic = "storm-test-topic1";
>>>>> final String zkRoot = "";
>>>>> final String clientId = "storm-consumer";
>>>>> SpoutConfig kafkaConf = new SpoutConfig(zkrHosts, kafkaTopic, zkRoot,
>>>>> clientId);
>>>>> kafkaConf.startOffsetTime = -2;
>>>>> kafkaConf.scheme = new SchemeAsMultiScheme(new StringScheme());
>>>>>
>>>>> // Build topology to consume message from kafka and print them on
>>>>> console
>>>>> final TopologyBuilder topologyBuilder = new TopologyBuilder();
>>>>> // Create KafkaSpout instance using Kafka configuration and add it to
>>>>> topology
>>>>> topologyBuilder.setSpout("kafka-spout", new KafkaSpout(kafkaConf), 1);
>>>>> //Route the output of Kafka Spout to Logger bolt to log messages
>>>>> consumed from Kafka
>>>>> topologyBuilder.setBolt("print-messages", new
>>>>> LoggerBolt()).globalGrouping("kafka-spout");
>>>>> // Submit topology to local cluster i.e. embedded storm instance in
>>>>> eclipse
>>>>> Config conf = new Config();
>>>>> System.setProperty("storm.jar","C://apache-storm-1.1.1/lib/s
>>>>> torm-core-1.1.1.jar");
>>>>> StormSubmitter.submitTopology("kafkaTopology", conf,
>>>>> topologyBuilder.createTopology());
>>>>> }
>>>>> }
>>>>> ------------------------------------------------------------
>>>>> ----------------------------------
>>>>>
>>>>> Bolt:
>>>>>
>>>>> package com.kafka.storm.bolt;
>>>>>
>>>>> import org.apache.log4j.Logger;
>>>>> import org.apache.storm.topology.BasicOutputCollector;
>>>>> import org.apache.storm.topology.OutputFieldsDeclarer;
>>>>> import org.apache.storm.topology.base.BaseBasicBolt;
>>>>> import org.apache.storm.tuple.Fields;
>>>>> import org.apache.storm.tuple.Tuple;
>>>>>
>>>>> public class LoggerBolt extends BaseBasicBolt{
>>>>> private static final long serialVersionUID = 1L;
>>>>> private static final Logger LOG = Logger.getLogger(LoggerBolt.class);
>>>>>
>>>>> public void execute(Tuple input, BasicOutputCollector collector) {
>>>>> LOG.info(input.getString(0));
>>>>> }
>>>>>
>>>>> public void declareOutputFields(OutputFieldsDeclarer declarer) {
>>>>> declarer.declare(new Fields("message"));
>>>>> }
>>>>> }
>>>>>
>>>>>
>>>>> thank you in advance for any help you can give, or for just reading!
>>>>>
>>>>>
>>>>
>>>
>>
>

Re: New to Storm- KafkaSpout won't emit Tuples in Cluster mode

Posted by Ryan Bliton <ry...@gmail.com>.
just one quick update: fixed storm supervisor- that was something I did
while messing with ports.

the workers are still not being put to work however.

On Wed, Oct 11, 2017 at 3:58 PM, Ryan Bliton <ry...@gmail.com> wrote:

> I found this :https://issues.apache.org/jira/browse/STORM-1492
>
> "With the default value for nimbus.seeds (["localhost"]) Storm UI may
> list one "Offline" nimbus for localhost, and another as "Leader" for the
> resolved machine name.
>  A workaround is to modify storm.yaml and replace "localhost" with the
> hostname of the machine in nimbus.seeds."
>
> However, when I drop in my hostname, I am no longer able to spin up
> workers! storm supervisor does nothing now.
>
>
>
> On Wed, Oct 11, 2017 at 3:42 PM, Ryan Bliton <ry...@gmail.com>
> wrote:
>
>> Yes. Thank you for replying! I've been fussing over it some more and I
>> think I'm getting closer to the issue.
>>
>> In fact, the logs do give a clue- my workers start in state "EMPTY
>> -assignment null," do nothing, then get removed after not being used.
>> The work isn't even hitting the workers.
>>
>> in my Storm UI, it lists my PC name (ABCD-PC123453.my.company.name) as
>> the leader, and localhost as offline.
>>
>> So, somehow, I must have my nimbus and workers running somewhere
>> completely different from the Kafka cluster, which are running on localhost.
>>
>> I am currently futzing with port numbers in storm.yaml.
>>
>> How can I bring localhost online as the leader?
>>
>> On Wed, Oct 11, 2017 at 2:58 PM, Stig Rohde Døssing <sr...@apache.org>
>> wrote:
>>
>>> Hi Ryan,
>>>
>>> I don't see anything obviously wrong with your configuration. It's
>>> likely your topology logs can tell you what's going wrong. Next time you
>>> start your topology make note of the topology name in Storm UI. Also click
>>> in to your spout in Storm UI and note which worker port(s) it's running on
>>> (if you're running on a multi-node cluster you'll also need to note which
>>> machine is running the spout). You should then be able to go to
>>> $storm-install-dir/logs/workers-artifacts/$your-topology-name-here/$worker-port/worker.log
>>> on the relevant worker and see what the spout worker is logging.
>>>
>>> In case you don't find anything interesting there, you might also look
>>> at logs/nimbus.log on the machine running Nimbus and logs/supervisor.log on
>>> the machine running the supervisor for those logs.
>>>
>>> Also just to make sure, you're running "storm supervisor" as well as
>>> "storm nimbus", right? Otherwise your topology won't be assigned to a
>>> worker.
>>>
>>> 2017-10-11 16:53 GMT+02:00 Ryan Bliton <ry...@gmail.com>:
>>>
>>>> Hi! I'm trying to get a starter Kafka-Storm integration going. I've got
>>>> a simple topology working in Local mode- It reads the messages from a Kafka
>>>> topic and sends them to a bolt that logs them. However, when I try to
>>>> submit the Topology to a cluster, the Storm UI always reads 0 tuples
>>>> emitted from the KafkaSpout.
>>>>
>>>> I've done several laps around the internet at this point, built and
>>>> tried different starter projects, and each has the same issue. I can submit
>>>> the Topology, but it won't actually work.
>>>>
>>>> Similar problems to mine seem to come from the Storm /lib and
>>>> incompatible .jar files within. I haven't found anything like that in my
>>>> case. However, I'm not 100% sure what I should be looking for so I can't
>>>> rule it out.
>>>>
>>>> I don't know how to make code look pretty on a mailing list, so here is
>>>> a stack overflow about my issue:
>>>>
>>>> https://stackoverflow.com/questions/46676377/apache-storm-ka
>>>> fka-cant-see-sent-kafka-messages-in-storm-ui
>>>>
>>>> I make sure to call storm.supervisor before testing.
>>>>
>>>> I have zookeeper running off port 2181.
>>>>
>>>> I spin up a Kafka broker and use the topic storm-test-topic1.
>>>>
>>>> I fire up a console Kafka producer to send nonsense messages.
>>>>
>>>> Storm.yaml:
>>>> ########### These MUST be filled in for a storm configuration
>>>>  storm.zookeeper.servers:
>>>>      - "localhost"
>>>> #     - "server2"
>>>> #
>>>>  nimbus.seeds: ["localhost"]
>>>> #
>>>> #
>>>>
>>>> ------------------------------------------------------------
>>>> ----------------------------------
>>>> Topology:
>>>>
>>>> package com.kafka.storm;
>>>>
>>>> import java.util.HashMap;
>>>>
>>>> import org.apache.log4j.Logger;
>>>> import org.apache.storm.Config;
>>>> import org.apache.storm.LocalCluster;
>>>> import org.apache.storm.StormSubmitter;
>>>> import org.apache.storm.generated.AlreadyAliveException;
>>>> import org.apache.storm.generated.AuthorizationException;
>>>> import org.apache.storm.generated.InvalidTopologyException;
>>>> import org.apache.storm.kafka.BrokerHosts;
>>>> import org.apache.storm.kafka.KafkaSpout;
>>>> import org.apache.storm.kafka.SpoutConfig;
>>>> import org.apache.storm.kafka.StringScheme;
>>>> import org.apache.storm.kafka.ZkHosts;
>>>> import org.apache.storm.spout.SchemeAsMultiScheme;
>>>> import org.apache.storm.topology.TopologyBuilder;
>>>>
>>>> import com.kafka.storm.bolt.LoggerBolt;
>>>>
>>>> public class KafkaStormIntegrationDemo {
>>>> private static final Logger LOG = Logger.getLogger(KafkaStormInt
>>>> egrationDemo.class);
>>>>
>>>> public static void main(String[] args) throws InvalidTopologyException,
>>>> AuthorizationException, AlreadyAliveException {
>>>>
>>>> // Build Spout configuration using input command line parameters
>>>> final BrokerHosts zkrHosts = new ZkHosts("localhost:2181");
>>>> final String kafkaTopic = "storm-test-topic1";
>>>> final String zkRoot = "";
>>>> final String clientId = "storm-consumer";
>>>> SpoutConfig kafkaConf = new SpoutConfig(zkrHosts, kafkaTopic, zkRoot,
>>>> clientId);
>>>> kafkaConf.startOffsetTime = -2;
>>>> kafkaConf.scheme = new SchemeAsMultiScheme(new StringScheme());
>>>>
>>>> // Build topology to consume message from kafka and print them on
>>>> console
>>>> final TopologyBuilder topologyBuilder = new TopologyBuilder();
>>>> // Create KafkaSpout instance using Kafka configuration and add it to
>>>> topology
>>>> topologyBuilder.setSpout("kafka-spout", new KafkaSpout(kafkaConf), 1);
>>>> //Route the output of Kafka Spout to Logger bolt to log messages
>>>> consumed from Kafka
>>>> topologyBuilder.setBolt("print-messages", new
>>>> LoggerBolt()).globalGrouping("kafka-spout");
>>>> // Submit topology to local cluster i.e. embedded storm instance in
>>>> eclipse
>>>> Config conf = new Config();
>>>> System.setProperty("storm.jar","C://apache-storm-1.1.1/lib/s
>>>> torm-core-1.1.1.jar");
>>>> StormSubmitter.submitTopology("kafkaTopology", conf,
>>>> topologyBuilder.createTopology());
>>>> }
>>>> }
>>>> ------------------------------------------------------------
>>>> ----------------------------------
>>>>
>>>> Bolt:
>>>>
>>>> package com.kafka.storm.bolt;
>>>>
>>>> import org.apache.log4j.Logger;
>>>> import org.apache.storm.topology.BasicOutputCollector;
>>>> import org.apache.storm.topology.OutputFieldsDeclarer;
>>>> import org.apache.storm.topology.base.BaseBasicBolt;
>>>> import org.apache.storm.tuple.Fields;
>>>> import org.apache.storm.tuple.Tuple;
>>>>
>>>> public class LoggerBolt extends BaseBasicBolt{
>>>> private static final long serialVersionUID = 1L;
>>>> private static final Logger LOG = Logger.getLogger(LoggerBolt.class);
>>>>
>>>> public void execute(Tuple input, BasicOutputCollector collector) {
>>>> LOG.info(input.getString(0));
>>>> }
>>>>
>>>> public void declareOutputFields(OutputFieldsDeclarer declarer) {
>>>> declarer.declare(new Fields("message"));
>>>> }
>>>> }
>>>>
>>>>
>>>> thank you in advance for any help you can give, or for just reading!
>>>>
>>>>
>>>
>>
>

Re: New to Storm- KafkaSpout won't emit Tuples in Cluster mode

Posted by Ryan Bliton <ry...@gmail.com>.
I found this :https://issues.apache.org/jira/browse/STORM-1492

"With the default value for nimbus.seeds (["localhost"]) Storm UI may list
one "Offline" nimbus for localhost, and another as "Leader" for the
resolved machine name.
 A workaround is to modify storm.yaml and replace "localhost" with the
hostname of the machine in nimbus.seeds."

However, when I drop in my hostname, I am no longer able to spin up
workers! storm supervisor does nothing now.



On Wed, Oct 11, 2017 at 3:42 PM, Ryan Bliton <ry...@gmail.com> wrote:

> Yes. Thank you for replying! I've been fussing over it some more and I
> think I'm getting closer to the issue.
>
> In fact, the logs do give a clue- my workers start in state "EMPTY
> -assignment null," do nothing, then get removed after not being used.
> The work isn't even hitting the workers.
>
> in my Storm UI, it lists my PC name (ABCD-PC123453.my.company.name) as
> the leader, and localhost as offline.
>
> So, somehow, I must have my nimbus and workers running somewhere
> completely different from the Kafka cluster, which are running on localhost.
>
> I am currently futzing with port numbers in storm.yaml.
>
> How can I bring localhost online as the leader?
>
> On Wed, Oct 11, 2017 at 2:58 PM, Stig Rohde Døssing <sr...@apache.org>
> wrote:
>
>> Hi Ryan,
>>
>> I don't see anything obviously wrong with your configuration. It's likely
>> your topology logs can tell you what's going wrong. Next time you start
>> your topology make note of the topology name in Storm UI. Also click in to
>> your spout in Storm UI and note which worker port(s) it's running on (if
>> you're running on a multi-node cluster you'll also need to note which
>> machine is running the spout). You should then be able to go to
>> $storm-install-dir/logs/workers-artifacts/$your-topology-
>> name-here/$worker-port/worker.log on the relevant worker and see what
>> the spout worker is logging.
>>
>> In case you don't find anything interesting there, you might also look at
>> logs/nimbus.log on the machine running Nimbus and logs/supervisor.log on
>> the machine running the supervisor for those logs.
>>
>> Also just to make sure, you're running "storm supervisor" as well as
>> "storm nimbus", right? Otherwise your topology won't be assigned to a
>> worker.
>>
>> 2017-10-11 16:53 GMT+02:00 Ryan Bliton <ry...@gmail.com>:
>>
>>> Hi! I'm trying to get a starter Kafka-Storm integration going. I've got
>>> a simple topology working in Local mode- It reads the messages from a Kafka
>>> topic and sends them to a bolt that logs them. However, when I try to
>>> submit the Topology to a cluster, the Storm UI always reads 0 tuples
>>> emitted from the KafkaSpout.
>>>
>>> I've done several laps around the internet at this point, built and
>>> tried different starter projects, and each has the same issue. I can submit
>>> the Topology, but it won't actually work.
>>>
>>> Similar problems to mine seem to come from the Storm /lib and
>>> incompatible .jar files within. I haven't found anything like that in my
>>> case. However, I'm not 100% sure what I should be looking for so I can't
>>> rule it out.
>>>
>>> I don't know how to make code look pretty on a mailing list, so here is
>>> a stack overflow about my issue:
>>>
>>> https://stackoverflow.com/questions/46676377/apache-storm-ka
>>> fka-cant-see-sent-kafka-messages-in-storm-ui
>>>
>>> I make sure to call storm.supervisor before testing.
>>>
>>> I have zookeeper running off port 2181.
>>>
>>> I spin up a Kafka broker and use the topic storm-test-topic1.
>>>
>>> I fire up a console Kafka producer to send nonsense messages.
>>>
>>> Storm.yaml:
>>> ########### These MUST be filled in for a storm configuration
>>>  storm.zookeeper.servers:
>>>      - "localhost"
>>> #     - "server2"
>>> #
>>>  nimbus.seeds: ["localhost"]
>>> #
>>> #
>>>
>>> ------------------------------------------------------------
>>> ----------------------------------
>>> Topology:
>>>
>>> package com.kafka.storm;
>>>
>>> import java.util.HashMap;
>>>
>>> import org.apache.log4j.Logger;
>>> import org.apache.storm.Config;
>>> import org.apache.storm.LocalCluster;
>>> import org.apache.storm.StormSubmitter;
>>> import org.apache.storm.generated.AlreadyAliveException;
>>> import org.apache.storm.generated.AuthorizationException;
>>> import org.apache.storm.generated.InvalidTopologyException;
>>> import org.apache.storm.kafka.BrokerHosts;
>>> import org.apache.storm.kafka.KafkaSpout;
>>> import org.apache.storm.kafka.SpoutConfig;
>>> import org.apache.storm.kafka.StringScheme;
>>> import org.apache.storm.kafka.ZkHosts;
>>> import org.apache.storm.spout.SchemeAsMultiScheme;
>>> import org.apache.storm.topology.TopologyBuilder;
>>>
>>> import com.kafka.storm.bolt.LoggerBolt;
>>>
>>> public class KafkaStormIntegrationDemo {
>>> private static final Logger LOG = Logger.getLogger(KafkaStormInt
>>> egrationDemo.class);
>>>
>>> public static void main(String[] args) throws InvalidTopologyException,
>>> AuthorizationException, AlreadyAliveException {
>>>
>>> // Build Spout configuration using input command line parameters
>>> final BrokerHosts zkrHosts = new ZkHosts("localhost:2181");
>>> final String kafkaTopic = "storm-test-topic1";
>>> final String zkRoot = "";
>>> final String clientId = "storm-consumer";
>>> SpoutConfig kafkaConf = new SpoutConfig(zkrHosts, kafkaTopic, zkRoot,
>>> clientId);
>>> kafkaConf.startOffsetTime = -2;
>>> kafkaConf.scheme = new SchemeAsMultiScheme(new StringScheme());
>>>
>>> // Build topology to consume message from kafka and print them on console
>>> final TopologyBuilder topologyBuilder = new TopologyBuilder();
>>> // Create KafkaSpout instance using Kafka configuration and add it to
>>> topology
>>> topologyBuilder.setSpout("kafka-spout", new KafkaSpout(kafkaConf), 1);
>>> //Route the output of Kafka Spout to Logger bolt to log messages
>>> consumed from Kafka
>>> topologyBuilder.setBolt("print-messages", new
>>> LoggerBolt()).globalGrouping("kafka-spout");
>>> // Submit topology to local cluster i.e. embedded storm instance in
>>> eclipse
>>> Config conf = new Config();
>>> System.setProperty("storm.jar","C://apache-storm-1.1.1/lib/s
>>> torm-core-1.1.1.jar");
>>> StormSubmitter.submitTopology("kafkaTopology", conf,
>>> topologyBuilder.createTopology());
>>> }
>>> }
>>> ------------------------------------------------------------
>>> ----------------------------------
>>>
>>> Bolt:
>>>
>>> package com.kafka.storm.bolt;
>>>
>>> import org.apache.log4j.Logger;
>>> import org.apache.storm.topology.BasicOutputCollector;
>>> import org.apache.storm.topology.OutputFieldsDeclarer;
>>> import org.apache.storm.topology.base.BaseBasicBolt;
>>> import org.apache.storm.tuple.Fields;
>>> import org.apache.storm.tuple.Tuple;
>>>
>>> public class LoggerBolt extends BaseBasicBolt{
>>> private static final long serialVersionUID = 1L;
>>> private static final Logger LOG = Logger.getLogger(LoggerBolt.class);
>>>
>>> public void execute(Tuple input, BasicOutputCollector collector) {
>>> LOG.info(input.getString(0));
>>> }
>>>
>>> public void declareOutputFields(OutputFieldsDeclarer declarer) {
>>> declarer.declare(new Fields("message"));
>>> }
>>> }
>>>
>>>
>>> thank you in advance for any help you can give, or for just reading!
>>>
>>>
>>
>

Re: New to Storm- KafkaSpout won't emit Tuples in Cluster mode

Posted by Ryan Bliton <ry...@gmail.com>.
Yes. Thank you for replying! I've been fussing over it some more and I
think I'm getting closer to the issue.

In fact, the logs do give a clue- my workers start in state "EMPTY
-assignment null," do nothing, then get removed after not being used.
The work isn't even hitting the workers.

in my Storm UI, it lists my PC name (ABCD-PC123453.my.company.name) as the
leader, and localhost as offline.

So, somehow, I must have my nimbus and workers running somewhere completely
different from the Kafka cluster, which are running on localhost.

I am currently futzing with port numbers in storm.yaml.

How can I bring localhost online as the leader?

On Wed, Oct 11, 2017 at 2:58 PM, Stig Rohde Døssing <sr...@apache.org> wrote:

> Hi Ryan,
>
> I don't see anything obviously wrong with your configuration. It's likely
> your topology logs can tell you what's going wrong. Next time you start
> your topology make note of the topology name in Storm UI. Also click in to
> your spout in Storm UI and note which worker port(s) it's running on (if
> you're running on a multi-node cluster you'll also need to note which
> machine is running the spout). You should then be able to go to
> $storm-install-dir/logs/workers-artifacts/$your-
> topology-name-here/$worker-port/worker.log on the relevant worker and see
> what the spout worker is logging.
>
> In case you don't find anything interesting there, you might also look at
> logs/nimbus.log on the machine running Nimbus and logs/supervisor.log on
> the machine running the supervisor for those logs.
>
> Also just to make sure, you're running "storm supervisor" as well as
> "storm nimbus", right? Otherwise your topology won't be assigned to a
> worker.
>
> 2017-10-11 16:53 GMT+02:00 Ryan Bliton <ry...@gmail.com>:
>
>> Hi! I'm trying to get a starter Kafka-Storm integration going. I've got a
>> simple topology working in Local mode- It reads the messages from a Kafka
>> topic and sends them to a bolt that logs them. However, when I try to
>> submit the Topology to a cluster, the Storm UI always reads 0 tuples
>> emitted from the KafkaSpout.
>>
>> I've done several laps around the internet at this point, built and tried
>> different starter projects, and each has the same issue. I can submit the
>> Topology, but it won't actually work.
>>
>> Similar problems to mine seem to come from the Storm /lib and
>> incompatible .jar files within. I haven't found anything like that in my
>> case. However, I'm not 100% sure what I should be looking for so I can't
>> rule it out.
>>
>> I don't know how to make code look pretty on a mailing list, so here is a
>> stack overflow about my issue:
>>
>> https://stackoverflow.com/questions/46676377/apache-storm-
>> kafka-cant-see-sent-kafka-messages-in-storm-ui
>>
>> I make sure to call storm.supervisor before testing.
>>
>> I have zookeeper running off port 2181.
>>
>> I spin up a Kafka broker and use the topic storm-test-topic1.
>>
>> I fire up a console Kafka producer to send nonsense messages.
>>
>> Storm.yaml:
>> ########### These MUST be filled in for a storm configuration
>>  storm.zookeeper.servers:
>>      - "localhost"
>> #     - "server2"
>> #
>>  nimbus.seeds: ["localhost"]
>> #
>> #
>>
>> ------------------------------------------------------------
>> ----------------------------------
>> Topology:
>>
>> package com.kafka.storm;
>>
>> import java.util.HashMap;
>>
>> import org.apache.log4j.Logger;
>> import org.apache.storm.Config;
>> import org.apache.storm.LocalCluster;
>> import org.apache.storm.StormSubmitter;
>> import org.apache.storm.generated.AlreadyAliveException;
>> import org.apache.storm.generated.AuthorizationException;
>> import org.apache.storm.generated.InvalidTopologyException;
>> import org.apache.storm.kafka.BrokerHosts;
>> import org.apache.storm.kafka.KafkaSpout;
>> import org.apache.storm.kafka.SpoutConfig;
>> import org.apache.storm.kafka.StringScheme;
>> import org.apache.storm.kafka.ZkHosts;
>> import org.apache.storm.spout.SchemeAsMultiScheme;
>> import org.apache.storm.topology.TopologyBuilder;
>>
>> import com.kafka.storm.bolt.LoggerBolt;
>>
>> public class KafkaStormIntegrationDemo {
>> private static final Logger LOG = Logger.getLogger(KafkaStormInt
>> egrationDemo.class);
>>
>> public static void main(String[] args) throws InvalidTopologyException,
>> AuthorizationException, AlreadyAliveException {
>>
>> // Build Spout configuration using input command line parameters
>> final BrokerHosts zkrHosts = new ZkHosts("localhost:2181");
>> final String kafkaTopic = "storm-test-topic1";
>> final String zkRoot = "";
>> final String clientId = "storm-consumer";
>> SpoutConfig kafkaConf = new SpoutConfig(zkrHosts, kafkaTopic, zkRoot,
>> clientId);
>> kafkaConf.startOffsetTime = -2;
>> kafkaConf.scheme = new SchemeAsMultiScheme(new StringScheme());
>>
>> // Build topology to consume message from kafka and print them on console
>> final TopologyBuilder topologyBuilder = new TopologyBuilder();
>> // Create KafkaSpout instance using Kafka configuration and add it to
>> topology
>> topologyBuilder.setSpout("kafka-spout", new KafkaSpout(kafkaConf), 1);
>> //Route the output of Kafka Spout to Logger bolt to log messages consumed
>> from Kafka
>> topologyBuilder.setBolt("print-messages", new
>> LoggerBolt()).globalGrouping("kafka-spout");
>> // Submit topology to local cluster i.e. embedded storm instance in
>> eclipse
>> Config conf = new Config();
>> System.setProperty("storm.jar","C://apache-storm-1.1.1/lib/s
>> torm-core-1.1.1.jar");
>> StormSubmitter.submitTopology("kafkaTopology", conf,
>> topologyBuilder.createTopology());
>> }
>> }
>> ------------------------------------------------------------
>> ----------------------------------
>>
>> Bolt:
>>
>> package com.kafka.storm.bolt;
>>
>> import org.apache.log4j.Logger;
>> import org.apache.storm.topology.BasicOutputCollector;
>> import org.apache.storm.topology.OutputFieldsDeclarer;
>> import org.apache.storm.topology.base.BaseBasicBolt;
>> import org.apache.storm.tuple.Fields;
>> import org.apache.storm.tuple.Tuple;
>>
>> public class LoggerBolt extends BaseBasicBolt{
>> private static final long serialVersionUID = 1L;
>> private static final Logger LOG = Logger.getLogger(LoggerBolt.class);
>>
>> public void execute(Tuple input, BasicOutputCollector collector) {
>> LOG.info(input.getString(0));
>> }
>>
>> public void declareOutputFields(OutputFieldsDeclarer declarer) {
>> declarer.declare(new Fields("message"));
>> }
>> }
>>
>>
>> thank you in advance for any help you can give, or for just reading!
>>
>>
>

Re: New to Storm- KafkaSpout won't emit Tuples in Cluster mode

Posted by Stig Rohde Døssing <sr...@apache.org>.
Hi Ryan,

I don't see anything obviously wrong with your configuration. It's likely
your topology logs can tell you what's going wrong. Next time you start
your topology make note of the topology name in Storm UI. Also click in to
your spout in Storm UI and note which worker port(s) it's running on (if
you're running on a multi-node cluster you'll also need to note which
machine is running the spout). You should then be able to go to
$storm-install-dir/logs/workers-artifacts/$your-topology-name-here/$worker-port/worker.log
on the relevant worker and see what the spout worker is logging.

In case you don't find anything interesting there, you might also look at
logs/nimbus.log on the machine running Nimbus and logs/supervisor.log on
the machine running the supervisor for those logs.

Also just to make sure, you're running "storm supervisor" as well as "storm
nimbus", right? Otherwise your topology won't be assigned to a worker.

2017-10-11 16:53 GMT+02:00 Ryan Bliton <ry...@gmail.com>:

> Hi! I'm trying to get a starter Kafka-Storm integration going. I've got a
> simple topology working in Local mode- It reads the messages from a Kafka
> topic and sends them to a bolt that logs them. However, when I try to
> submit the Topology to a cluster, the Storm UI always reads 0 tuples
> emitted from the KafkaSpout.
>
> I've done several laps around the internet at this point, built and tried
> different starter projects, and each has the same issue. I can submit the
> Topology, but it won't actually work.
>
> Similar problems to mine seem to come from the Storm /lib and incompatible
> .jar files within. I haven't found anything like that in my case. However,
> I'm not 100% sure what I should be looking for so I can't rule it out.
>
> I don't know how to make code look pretty on a mailing list, so here is a
> stack overflow about my issue:
>
> https://stackoverflow.com/questions/46676377/apache-
> storm-kafka-cant-see-sent-kafka-messages-in-storm-ui
>
> I make sure to call storm.supervisor before testing.
>
> I have zookeeper running off port 2181.
>
> I spin up a Kafka broker and use the topic storm-test-topic1.
>
> I fire up a console Kafka producer to send nonsense messages.
>
> Storm.yaml:
> ########### These MUST be filled in for a storm configuration
>  storm.zookeeper.servers:
>      - "localhost"
> #     - "server2"
> #
>  nimbus.seeds: ["localhost"]
> #
> #
>
> ------------------------------------------------------------
> ----------------------------------
> Topology:
>
> package com.kafka.storm;
>
> import java.util.HashMap;
>
> import org.apache.log4j.Logger;
> import org.apache.storm.Config;
> import org.apache.storm.LocalCluster;
> import org.apache.storm.StormSubmitter;
> import org.apache.storm.generated.AlreadyAliveException;
> import org.apache.storm.generated.AuthorizationException;
> import org.apache.storm.generated.InvalidTopologyException;
> import org.apache.storm.kafka.BrokerHosts;
> import org.apache.storm.kafka.KafkaSpout;
> import org.apache.storm.kafka.SpoutConfig;
> import org.apache.storm.kafka.StringScheme;
> import org.apache.storm.kafka.ZkHosts;
> import org.apache.storm.spout.SchemeAsMultiScheme;
> import org.apache.storm.topology.TopologyBuilder;
>
> import com.kafka.storm.bolt.LoggerBolt;
>
> public class KafkaStormIntegrationDemo {
> private static final Logger LOG = Logger.getLogger(
> KafkaStormIntegrationDemo.class);
>
> public static void main(String[] args) throws InvalidTopologyException,
> AuthorizationException, AlreadyAliveException {
>
> // Build Spout configuration using input command line parameters
> final BrokerHosts zkrHosts = new ZkHosts("localhost:2181");
> final String kafkaTopic = "storm-test-topic1";
> final String zkRoot = "";
> final String clientId = "storm-consumer";
> SpoutConfig kafkaConf = new SpoutConfig(zkrHosts, kafkaTopic, zkRoot,
> clientId);
> kafkaConf.startOffsetTime = -2;
> kafkaConf.scheme = new SchemeAsMultiScheme(new StringScheme());
>
> // Build topology to consume message from kafka and print them on console
> final TopologyBuilder topologyBuilder = new TopologyBuilder();
> // Create KafkaSpout instance using Kafka configuration and add it to
> topology
> topologyBuilder.setSpout("kafka-spout", new KafkaSpout(kafkaConf), 1);
> //Route the output of Kafka Spout to Logger bolt to log messages consumed
> from Kafka
> topologyBuilder.setBolt("print-messages", new
> LoggerBolt()).globalGrouping("kafka-spout");
> // Submit topology to local cluster i.e. embedded storm instance in eclipse
> Config conf = new Config();
> System.setProperty("storm.jar","C://apache-storm-1.1.1/lib/
> storm-core-1.1.1.jar");
> StormSubmitter.submitTopology("kafkaTopology", conf, topologyBuilder.
> createTopology());
> }
> }
> ------------------------------------------------------------
> ----------------------------------
>
> Bolt:
>
> package com.kafka.storm.bolt;
>
> import org.apache.log4j.Logger;
> import org.apache.storm.topology.BasicOutputCollector;
> import org.apache.storm.topology.OutputFieldsDeclarer;
> import org.apache.storm.topology.base.BaseBasicBolt;
> import org.apache.storm.tuple.Fields;
> import org.apache.storm.tuple.Tuple;
>
> public class LoggerBolt extends BaseBasicBolt{
> private static final long serialVersionUID = 1L;
> private static final Logger LOG = Logger.getLogger(LoggerBolt.class);
>
> public void execute(Tuple input, BasicOutputCollector collector) {
> LOG.info(input.getString(0));
> }
>
> public void declareOutputFields(OutputFieldsDeclarer declarer) {
> declarer.declare(new Fields("message"));
> }
> }
>
>
> thank you in advance for any help you can give, or for just reading!
>
>