You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flume.apache.org by Justin Ryan <ju...@ziprealty.com> on 2016/02/04 22:16:53 UTC

KafkaSource not picking up any messages

Hiya folks,

I¹m setting up a new environment with Kafka, Flume, and HDFS, and have
implemented the simplest possible testing configuration I can come up with.
It logs successfully configuring and starting the KafkaSource, and with
kafka tools I can confirm that messages have been sent, but the JSON Metrics
from Flume show 0 messages processed.

Are there any more tools at my disposal to investigate? Any assistance would
be greatly appreciated!

My config and log:

‹
# generated by Chef for mesos10, changes will be overwritten

flume1.sources=kafka-source-test
flume1.channels=hdfs-channel-kafka
flume1.sinks=hdfs-sink-kafka
flume1.sources.kafka-source-test.type=org.apache.flume.source.kafka.KafkaSou
rce
flume1.sources.kafka-source-test.zookeeperConnect=zk01:2181/mesos-kafka
flume1.sources.kafka-source-test.topic=test
flume1.sources.kafka-source-test.groupId=flume
flume1.sources.kafka-source-test.interceptors=i1
flume1.sources.kafka-source-test.interceptors.i1.type=timestamp
flume1.sources.kafka-source-test.consumer.timeout.ms=100
flume1.sources.kafka-source-test.channels=hdfs-channel-kafka
flume1.channels.hdfs-channel-kafka.type=memory
flume1.sinks.hdfs-sink-kafka.channel=hdfs-channel-kafka
flume1.sinks.hdfs-sink-kafka.type=hdfs
flume1.sinks.hdfs-sink-kafka.hdfs.path=/tmp/kafka/%{topic}/%y-%m-%d
flume1.sinks.hdfs-sink-kafka.hdfs.rollInterval=5
flume1.sinks.hdfs-sink-kafka.hdfs.rollCount=0
flume1.sinks.hdfs-sink-kafka.hdfs.rollSize=0
flume1.sinks.hdfs-sink-kafka.hdfs.fileType=DataStream
flume1.channels.hdfs-channel-kafka.capacity=10
flume1.channels.hdfs-channel-kafka.transactionCapacity=10
‹

Startup log (less incredibly long path lines):
‹
16/02/04 11:32:07 INFO node.PollingPropertiesFileConfigurationProvider:
Configuration provider starting
16/02/04 11:32:07 INFO node.PollingPropertiesFileConfigurationProvider:
Reloading configuration file:/etc/flume/conf.chef/flume.conf
16/02/04 11:32:07 INFO conf.FlumeConfiguration: Processing:hdfs-sink-kafka
16/02/04 11:32:07 INFO conf.FlumeConfiguration: Processing:hdfs-sink-kafka
16/02/04 11:32:07 INFO conf.FlumeConfiguration: Processing:hdfs-sink-kafka
16/02/04 11:32:07 INFO conf.FlumeConfiguration: Processing:hdfs-sink-kafka
16/02/04 11:32:07 INFO conf.FlumeConfiguration: Added sinks: hdfs-sink-kafka
Agent: flume1
16/02/04 11:32:07 INFO conf.FlumeConfiguration: Processing:hdfs-sink-kafka
16/02/04 11:32:07 INFO conf.FlumeConfiguration: Processing:hdfs-sink-kafka
16/02/04 11:32:07 INFO conf.FlumeConfiguration: Processing:hdfs-sink-kafka
16/02/04 11:32:07 INFO conf.FlumeConfiguration: Post-validation flume
configuration contains configuration for agents: [flume1]
16/02/04 11:32:07 INFO node.AbstractConfigurationProvider: Creating channels
16/02/04 11:32:07 INFO channel.DefaultChannelFactory: Creating instance of
channel hdfs-channel-kafka type memory
16/02/04 11:32:07 INFO node.AbstractConfigurationProvider: Created channel
hdfs-channel-kafka
16/02/04 11:32:07 INFO source.DefaultSourceFactory: Creating instance of
source kafka-source-test, type org.apache.flume.source.kafka.KafkaSource
16/02/04 11:32:07 INFO kafka.KafkaSourceUtil: context={
parameters:{interceptors.i1.type=timestamp,
zookeeperConnect=zk01:2181/mesos-kafka, channels=hdfs-channel-kafka,
groupId=flume, consumer.timeout.ms=100, topic=test,
type=org.apache.flume.source.kafka.KafkaSource, interceptors=i1} }
16/02/04 11:32:07 INFO sink.DefaultSinkFactory: Creating instance of sink:
hdfs-sink-kafka, type: hdfs
16/02/04 11:32:07 INFO node.AbstractConfigurationProvider: Channel
hdfs-channel-kafka connected to [kafka-source-test, hdfs-sink-kafka]
16/02/04 11:32:07 INFO node.Application: Starting new configuration:{
sourceRunners:{kafka-source-test=PollableSourceRunner: {
source:org.apache.flume.source.kafka.KafkaSource{name:kafka-source-test,stat
e:IDLE} counterGroup:{ name:null counters:{} } }}
sinkRunners:{hdfs-sink-kafka=SinkRunner: {
policy:org.apache.flume.sink.DefaultSinkProcessor@2f33f35e counterGroup:{
name:null counters:{} } }}
channels:{hdfs-channel-kafka=org.apache.flume.channel.MemoryChannel{name:
hdfs-channel-kafka}} }
16/02/04 11:32:07 INFO node.Application: Starting Channel hdfs-channel-kafka
16/02/04 11:32:07 INFO instrumentation.MonitoredCounterGroup: Monitored
counter group for type: CHANNEL, name: hdfs-channel-kafka: Successfully
registered new MBean.
16/02/04 11:32:07 INFO instrumentation.MonitoredCounterGroup: Component
type: CHANNEL, name: hdfs-channel-kafka started
16/02/04 11:32:07 INFO node.Application: Starting Sink hdfs-sink-kafka
16/02/04 11:32:07 INFO node.Application: Starting Source kafka-source-test
16/02/04 11:32:07 INFO kafka.KafkaSource: Starting
org.apache.flume.source.kafka.KafkaSource{name:kafka-source-test,state:IDLE}
...
16/02/04 11:32:07 INFO instrumentation.MonitoredCounterGroup: Monitored
counter group for type: SINK, name: hdfs-sink-kafka: Successfully registered
new MBean.
16/02/04 11:32:07 INFO instrumentation.MonitoredCounterGroup: Component
type: SINK, name: hdfs-sink-kafka started
16/02/04 11:32:07 INFO mortbay.log: Logging to
org.slf4j.impl.Log4jLoggerAdapter(org.mortbay.log) via
org.mortbay.log.Slf4jLog
16/02/04 11:32:07 INFO mortbay.log: jetty-6.1.26.cloudera.4
16/02/04 11:32:07 INFO mortbay.log: Started
SelectChannelConnector@0.0.0.0:34545
16/02/04 11:32:08 INFO utils.VerifiableProperties: Verifying properties
16/02/04 11:32:08 INFO utils.VerifiableProperties: Property
auto.commit.enable is overridden to false
16/02/04 11:32:08 INFO utils.VerifiableProperties: Property
consumer.timeout.ms is overridden to 10
16/02/04 11:32:08 INFO utils.VerifiableProperties: Property group.id is
overridden to flume
16/02/04 11:32:08 INFO utils.VerifiableProperties: Property
zookeeper.connect is overridden to zk01:2181/mesos-kafka
16/02/04 11:32:08 INFO consumer.ZookeeperConsumerConnector:
[flume_mesos10-1454614328204-ca8a74df], Connecting to zookeeper instance at
zk01:2181/mesos-kafka
16/02/04 11:32:08 INFO zkclient.ZkEventThread: Starting ZkClient event
thread.
16/02/04 11:32:08 INFO zookeeper.ZooKeeper: Client
environment:zookeeper.version=3.4.5-946--1, built on 05/18/2015 19:03 GMT
16/02/04 11:32:08 INFO zookeeper.ZooKeeper: Client
environment:host.name=mesos10
16/02/04 11:32:08 INFO zookeeper.ZooKeeper: Client
environment:java.version=1.8.0_72-internal
16/02/04 11:32:08 INFO zookeeper.ZooKeeper: Client
environment:java.vendor=Oracle Corporation
16/02/04 11:32:08 INFO zookeeper.ZooKeeper: Client
environment:java.home=/usr/lib/jvm/java-8-openjdk-amd64/jre
16/02/04 11:32:08 INFO zookeeper.ZooKeeper: Client
environment:java.io.tmpdir=/tmp
16/02/04 11:32:08 INFO zookeeper.ZooKeeper: Client
environment:java.compiler=<NA>
16/02/04 11:32:08 INFO zookeeper.ZooKeeper: Client environment:os.name=Linux
16/02/04 11:32:08 INFO zookeeper.ZooKeeper: Client environment:os.arch=amd64
16/02/04 11:32:08 INFO zookeeper.ZooKeeper: Client
environment:os.version=3.13.0-63-generic
16/02/04 11:32:08 INFO zookeeper.ZooKeeper: Client
environment:user.name=marathon
16/02/04 11:32:08 INFO zookeeper.ZooKeeper: Client
environment:user.home=/opt/marathon
16/02/04 11:32:08 INFO zookeeper.ZooKeeper: Initiating client connection,
connectString=zk01:2181/mesos-kafka sessionTimeout=6000
watcher=org.I0Itec.zkclient.ZkClient@2e1b7b98
16/02/04 11:32:08 INFO zookeeper.ClientCnxn: Opening socket connection to
server 10.100.6.251/10.100.6.251:2181. Will not attempt to authenticate
using SASL (unknown error)
16/02/04 11:32:08 INFO zookeeper.ClientCnxn: Socket connection established
to 10.100.6.251/10.100.6.251:2181, initiating session
16/02/04 11:32:08 INFO zookeeper.ClientCnxn: Session establishment complete
on server 10.100.6.251/10.100.6.251:2181, sessionid = 0x152858b1cc07491,
negotiated timeout = 6000
16/02/04 11:32:08 INFO zkclient.ZkClient: zookeeper state changed
(SyncConnected)
16/02/04 11:32:09 INFO consumer.ZookeeperConsumerConnector:
[flume_mesos10-1454614328204-ca8a74df], begin registering consumer
flume_mesos10-1454614328204-ca8a74df in ZK
16/02/04 11:32:09 INFO consumer.ZookeeperConsumerConnector:
[flume_mesos10-1454614328204-ca8a74df], end registering consumer
flume_mesos10-1454614328204-ca8a74df in ZK
16/02/04 11:32:09 INFO consumer.ZookeeperConsumerConnector:
[flume_mesos10-1454614328204-ca8a74df], starting watcher executor thread for
consumer flume_mesos10-1454614328204-ca8a74df
16/02/04 11:32:09 INFO consumer.ZookeeperConsumerConnector:
[flume_mesos10-1454614328204-ca8a74df], begin rebalancing consumer
flume_mesos10-1454614328204-ca8a74df try #0
16/02/04 11:32:09 WARN consumer.ZookeeperConsumerConnector:
[flume_mesos10-1454614328204-ca8a74df], no brokers found when trying to
rebalance.
16/02/04 11:32:09 INFO consumer.ZookeeperConsumerConnector:
[flume_mesos10-1454614328204-ca8a74df], end rebalancing consumer
flume_mesos10-1454614328204-ca8a74df try #0
16/02/04 11:32:09 INFO kafka.KafkaSource: Kafka source kafka-source-test do
started.
16/02/04 11:32:09 INFO instrumentation.MonitoredCounterGroup: Monitored
counter group for type: SOURCE, name: kafka-source-test: Successfully
registered new MBean.
16/02/04 11:32:09 INFO instrumentation.MonitoredCounterGroup: Component
type: SOURCE, name: kafka-source-test started
--

--
Justin Alan Ryan
Sr. Systems / Release Engineer
ZipRealty



Re: KafkaSource not picking up any messages

Posted by Justin Ryan <ju...@ziprealty.com>.
So it looks like Flume is building against Kafka 2.10-0.8.1.1, and I¹m using
2.10-0.8.2.2, which kafka-mesos leans on.

Any likelihood of this being incompatible?

I did notice this morning that Flume has an ³EventReceivedCount² that
increments 1-2k at a time and is always a round number, but the
EventAcceptedCount is 0.

When I pipe dmesg to the console producer, and then try to read it with the
console consumer, I have to tell it to start from the beginning, so it seems
like flume may be reading events and moving the marker, but not doing
anything with them.

‹
{
   "CHANNEL.hdfs-channel-kafka" : {
      "EventTakeAttemptCount" : "117",
      "EventTakeSuccessCount" : "0",
      "StartTime" : "1454959470045",
      "ChannelFillPercentage" : "0.0",
      "ChannelSize" : "0",
      "EventPutSuccessCount" : "0",
      "Type" : "CHANNEL",
      "EventPutAttemptCount" : "1826",
      "ChannelCapacity" : "10",
      "StopTime" : "0"
   },
   "SOURCE.kafka-source-test" : {
      "StartTime" : "1454959471241",
      "OpenConnectionCount" : "0",
      "AppendBatchReceivedCount" : "0",
      "EventReceivedCount" : "166000",
      "KafkaCommitTimer" : "0",
      "AppendBatchAcceptedCount" : "0",
      "AppendReceivedCount" : "0",
      "StopTime" : "0",
      "KafkaEventGetTimer" : "17114",
      "KafkaEmptyCount" : "0",
      "EventAcceptedCount" : "0",
      "Type" : "SOURCE",
      "AppendAcceptedCount" : "0"
   },
   "SINK.hdfs-sink-kafka" : {
      "ConnectionFailedCount" : "0",
      "StartTime" : "1454959470054",
      "BatchCompleteCount" : "0",
      "ConnectionClosedCount" : "0",
      "EventDrainSuccessCount" : "0",
      "BatchEmptyCount" : "117",
      "BatchUnderflowCount" : "0",
      "Type" : "SINK",
      "ConnectionCreatedCount" : "0",
      "StopTime" : "0",
      "EventDrainAttemptCount" : "0"
   }
}
--

From:  Gonzalo Herreros <gh...@gmail.com>
Reply-To:  <us...@flume.apache.org>
Date:  Monday, February 8, 2016 at 12:29 AM
To:  user <us...@flume.apache.org>
Subject:  Re: KafkaSource not picking up any messages

The only thing I can think of is that the kafka client included in Kafka is
not compatible with the kafka version on the brokers (there's been a lot of
changes recently), but normally you get errors when that happens.

On 5 February 2016 at 20:02, Justin Ryan <ju...@ziprealty.com> wrote:
> Thanks, Gonzalo ­ that def helped!
> 
> This also ties into an issue I¹d raised with mesos-kafka where the zk path
> seemed to be ignored, and I now see that there is a node that stores the
> mesos-kafka scheduler config, and the kafka path must be specified separately,
> so is currently Œ/Œ.
> 
> Still not reading events, but definitely looks better in startup log:
> 
> 16/02/05 11:55:38 INFO kafka.KafkaSource: Kafka source kafka-source-test do
> started.
> 16/02/05 11:55:38 INFO consumer.ConsumerFetcherManager$LeaderFinderThread:
> [flume_mesos04-1454702137146-6cd63609-leader-finder-thread], Starting
> 16/02/05 11:55:38 INFO instrumentation.MonitoredCounterGroup: Monitored
> counter group for type: SOURCE, name: kafka-source-test: Successfully
> registered new MBean.
> 16/02/05 11:55:38 INFO instrumentation.MonitoredCounterGroup: Component type:
> SOURCE, name: kafka-source-test started
> 16/02/05 11:55:38 INFO utils.VerifiableProperties: Verifying properties
> 16/02/05 11:55:38 INFO utils.VerifiableProperties: Property client.id
> <http://client.id>  is overridden to flume
> 16/02/05 11:55:38 INFO utils.VerifiableProperties: Property
> metadata.broker.list is overridden to
> mesos01:31000,mesos02:31000,mesos08:31000
> 16/02/05 11:55:38 INFO utils.VerifiableProperties: Property request.timeout.ms
> <http://request.timeout.ms>  is overridden to 30000
> 16/02/05 11:55:38 INFO client.ClientUtils$: Fetching metadata from broker
> id:1,host:mesos02,port:31000 with correlation id 0 for 1 topic(s)
> Set(home_views)
> 16/02/05 11:55:38 INFO producer.SyncProducer: Connected to mesos02:31000 for
> producing
> 16/02/05 11:55:38 INFO producer.SyncProducer: Disconnecting from mesos02:31000
> 16/02/05 11:55:38 INFO consumer.ConsumerFetcherThread:
> [ConsumerFetcherThread-flume_mesos04-1454702137146-6cd63609-0-0], Starting
> 16/02/05 11:55:38 INFO consumer.ConsumerFetcherManager:
> [ConsumerFetcherManager-1454702137389] Added fetcher for partitions
> ArrayBuffer([[home_views,0], initOffset -1 to broker
> id:0,host:mesos01,port:31000] )
> 
> ‹
> 
> $ curl http://mesos04:34545/metrics | json_pp
>   % Total    % Received % Xferd  Average Speed   Time    Time     Time
> Current
>                                  Dload  Upload   Total   Spent    Left  Speed
> 100   925    0   925    0     0   7741      0 --:--:-- --:--:-- --:--:--  7773
> {
>    "CHANNEL.hdfs-channel-kafka" : {
>       "ChannelCapacity" : "10",
>       "StartTime" : "1454702136681",
>       "EventTakeSuccessCount" : "0",
>       "ChannelFillPercentage" : "0.0",
>       "EventPutAttemptCount" : "0",
>       "EventTakeAttemptCount" : "14",
>       "StopTime" : "0",
>       "ChannelSize" : "0",
>       "EventPutSuccessCount" : "0",
>       "Type" : "CHANNEL"
>    },
>    "SOURCE.kafka-source-test" : {
>       "AppendBatchReceivedCount" : "0",
>       "AppendAcceptedCount" : "0",
>       "KafkaEmptyCount" : "0",
>       "AppendReceivedCount" : "0",
>       "KafkaEventGetTimer" : "18046",
>       "EventAcceptedCount" : "0",
>       "StartTime" : "1454702138033",
>       "StopTime" : "0",
>       "KafkaCommitTimer" : "0",
>       "Type" : "SOURCE",
>       "AppendBatchAcceptedCount" : "0",
>       "EventReceivedCount" : "0",
>       "OpenConnectionCount" : "0"
>    },
>    "SINK.hdfs-sink-kafka" : {
>       "ConnectionCreatedCount" : "0",
>       "EventDrainAttemptCount" : "0",
>       "BatchCompleteCount" : "0",
>       "StartTime" : "1454702136714",
>       "Type" : "SINK",
>       "EventDrainSuccessCount" : "0",
>       "StopTime" : "0",
>       "BatchUnderflowCount" : "0",
>       "ConnectionFailedCount" : "0",
>       "BatchEmptyCount" : "13",
>       "ConnectionClosedCount" : "0"
>    }
> }
> 
> 
> From:  Gonzalo Herreros <gh...@gmail.com>
> Reply-To:  <us...@flume.apache.org>
> Date:  Thursday, February 4, 2016 at 11:15 PM
> To:  user <us...@flume.apache.org>
> Subject:  Re: KafkaSource not picking up any messages
> 
> I'm concerned with the warning "no brokers found when trying to rebalance"
> Double check that the path in zookeeper is correct zk01:2181/mesos-kafka and
> it's not the standard /kafka
> 
> When you connect with the kafka-console-consumer, do you specify /mesos-kafka
> or just zk01:2181?
> You can use the zkclient tool to check if there are brokers currently
> registered under that path for the topic "test"
> 
> Regards,
> Gonzalo
> 
> 
> On 4 February 2016 at 21:16, Justin Ryan <ju...@ziprealty.com> wrote:
>> Hiya folks,
>> 
>> I¹m setting up a new environment with Kafka, Flume, and HDFS, and have
>> implemented the simplest possible testing configuration I can come up with.
>> It logs successfully configuring and starting the KafkaSource, and with kafka
>> tools I can confirm that messages have been sent, but the JSON Metrics from
>> Flume show 0 messages processed.
>> 
>> Are there any more tools at my disposal to investigate? Any assistance would
>> be greatly appreciated!
>> 
>> My config and log:
>> 
>> ‹
>> # generated by Chef for mesos10, changes will be overwritten
>> 
>> flume1.sources=kafka-source-test
>> flume1.channels=hdfs-channel-kafka
>> flume1.sinks=hdfs-sink-kafka
>> flume1.sources.kafka-source-test.type=org.apache.flume.source.kafka.KafkaSour
>> ce
>> flume1.sources.kafka-source-test.zookeeperConnect=zk01:2181/mesos-kafka
>> flume1.sources.kafka-source-test.topic=test
>> flume1.sources.kafka-source-test.groupId=flume
>> flume1.sources.kafka-source-test.interceptors=i1
>> flume1.sources.kafka-source-test.interceptors.i1.type=timestamp
>> flume1.sources.kafka-source-test.consumer.timeout.ms
>> <http://flume1.sources.kafka-source-test.consumer.timeout.ms> =100
>> flume1.sources.kafka-source-test.channels=hdfs-channel-kafka
>> flume1.channels.hdfs-channel-kafka.type=memory
>> flume1.sinks.hdfs-sink-kafka.channel=hdfs-channel-kafka
>> flume1.sinks.hdfs-sink-kafka.type=hdfs
>> flume1.sinks.hdfs-sink-kafka.hdfs.path=/tmp/kafka/%{topic}/%y-%m-%d
>> flume1.sinks.hdfs-sink-kafka.hdfs.rollInterval=5
>> flume1.sinks.hdfs-sink-kafka.hdfs.rollCount=0
>> flume1.sinks.hdfs-sink-kafka.hdfs.rollSize=0
>> flume1.sinks.hdfs-sink-kafka.hdfs.fileType=DataStream
>> flume1.channels.hdfs-channel-kafka.capacity=10
>> flume1.channels.hdfs-channel-kafka.transactionCapacity=10
>> ‹
>> 
>> Startup log (less incredibly long path lines):
>> ‹
>> 16/02/04 11:32:07 INFO node.PollingPropertiesFileConfigurationProvider:
>> Configuration provider starting
>> 16/02/04 11:32:07 INFO node.PollingPropertiesFileConfigurationProvider:
>> Reloading configuration file:/etc/flume/conf.chef/flume.conf
>> 16/02/04 11:32:07 INFO conf.FlumeConfiguration: Processing:hdfs-sink-kafka
>> 16/02/04 11:32:07 INFO conf.FlumeConfiguration: Processing:hdfs-sink-kafka
>> 16/02/04 11:32:07 INFO conf.FlumeConfiguration: Processing:hdfs-sink-kafka
>> 16/02/04 11:32:07 INFO conf.FlumeConfiguration: Processing:hdfs-sink-kafka
>> 16/02/04 11:32:07 INFO conf.FlumeConfiguration: Added sinks: hdfs-sink-kafka
>> Agent: flume1
>> 16/02/04 11:32:07 INFO conf.FlumeConfiguration: Processing:hdfs-sink-kafka
>> 16/02/04 11:32:07 INFO conf.FlumeConfiguration: Processing:hdfs-sink-kafka
>> 16/02/04 11:32:07 INFO conf.FlumeConfiguration: Processing:hdfs-sink-kafka
>> 16/02/04 11:32:07 INFO conf.FlumeConfiguration: Post-validation flume
>> configuration contains configuration for agents: [flume1]
>> 16/02/04 11:32:07 INFO node.AbstractConfigurationProvider: Creating channels
>> 16/02/04 11:32:07 INFO channel.DefaultChannelFactory: Creating instance of
>> channel hdfs-channel-kafka type memory
>> 16/02/04 11:32:07 INFO node.AbstractConfigurationProvider: Created channel
>> hdfs-channel-kafka
>> 16/02/04 11:32:07 INFO source.DefaultSourceFactory: Creating instance of
>> source kafka-source-test, type org.apache.flume.source.kafka.KafkaSource
>> 16/02/04 11:32:07 INFO kafka.KafkaSourceUtil: context={
>> parameters:{interceptors.i1.type=timestamp,
>> zookeeperConnect=zk01:2181/mesos-kafka, channels=hdfs-channel-kafka,
>> groupId=flume, consumer.timeout.ms <http://consumer.timeout.ms> =100,
>> topic=test, type=org.apache.flume.source.kafka.KafkaSource, interceptors=i1}
>> }
>> 16/02/04 11:32:07 INFO sink.DefaultSinkFactory: Creating instance of sink:
>> hdfs-sink-kafka, type: hdfs
>> 16/02/04 11:32:07 INFO node.AbstractConfigurationProvider: Channel
>> hdfs-channel-kafka connected to [kafka-source-test, hdfs-sink-kafka]
>> 16/02/04 11:32:07 INFO node.Application: Starting new configuration:{
>> sourceRunners:{kafka-source-test=PollableSourceRunner: {
>> source:org.apache.flume.source.kafka.KafkaSource{name:kafka-source-test,state
>> :IDLE} counterGroup:{ name:null counters:{} } }}
>> sinkRunners:{hdfs-sink-kafka=SinkRunner: {
>> policy:org.apache.flume.sink.DefaultSinkProcessor@2f33f35e counterGroup:{
>> name:null counters:{} } }}
>> channels:{hdfs-channel-kafka=org.apache.flume.channel.MemoryChannel{name:
>> hdfs-channel-kafka}} }
>> 16/02/04 11:32:07 INFO node.Application: Starting Channel hdfs-channel-kafka
>> 16/02/04 11:32:07 INFO instrumentation.MonitoredCounterGroup: Monitored
>> counter group for type: CHANNEL, name: hdfs-channel-kafka: Successfully
>> registered new MBean.
>> 16/02/04 11:32:07 INFO instrumentation.MonitoredCounterGroup: Component type:
>> CHANNEL, name: hdfs-channel-kafka started
>> 16/02/04 11:32:07 INFO node.Application: Starting Sink hdfs-sink-kafka
>> 16/02/04 11:32:07 INFO node.Application: Starting Source kafka-source-test
>> 16/02/04 11:32:07 INFO kafka.KafkaSource: Starting
>> org.apache.flume.source.kafka.KafkaSource{name:kafka-source-test,state:IDLE}.
>> ..
>> 16/02/04 11:32:07 INFO instrumentation.MonitoredCounterGroup: Monitored
>> counter group for type: SINK, name: hdfs-sink-kafka: Successfully registered
>> new MBean.
>> 16/02/04 11:32:07 INFO instrumentation.MonitoredCounterGroup: Component type:
>> SINK, name: hdfs-sink-kafka started
>> 16/02/04 11:32:07 INFO mortbay.log: Logging to
>> org.slf4j.impl.Log4jLoggerAdapter(org.mortbay.log) via
>> org.mortbay.log.Slf4jLog
>> 16/02/04 11:32:07 INFO mortbay.log: jetty-6.1.26.cloudera.4
>> 16/02/04 11:32:07 INFO mortbay.log: Started
>> SelectChannelConnector@0.0.0.0:34545
>> <http://SelectChannelConnector@0.0.0.0:34545>
>> 16/02/04 11:32:08 INFO utils.VerifiableProperties: Verifying properties
>> 16/02/04 11:32:08 INFO utils.VerifiableProperties: Property
>> auto.commit.enable is overridden to false
>> 16/02/04 11:32:08 INFO utils.VerifiableProperties: Property
>> consumer.timeout.ms <http://consumer.timeout.ms>  is overridden to 10
>> 16/02/04 11:32:08 INFO utils.VerifiableProperties: Property group.id
>> <http://group.id>  is overridden to flume
>> 16/02/04 11:32:08 INFO utils.VerifiableProperties: Property zookeeper.connect
>> is overridden to zk01:2181/mesos-kafka
>> 16/02/04 11:32:08 INFO consumer.ZookeeperConsumerConnector:
>> [flume_mesos10-1454614328204-ca8a74df], Connecting to zookeeper instance at
>> zk01:2181/mesos-kafka
>> 16/02/04 11:32:08 INFO zkclient.ZkEventThread: Starting ZkClient event
>> thread.
>> 16/02/04 11:32:08 INFO zookeeper.ZooKeeper: Client
>> environment:zookeeper.version=3.4.5-946--1, built on 05/18/2015 19:03 GMT
>> 16/02/04 11:32:08 INFO zookeeper.ZooKeeper: Client environment:host.name
>> <http://host.name> =mesos10
>> 16/02/04 11:32:08 INFO zookeeper.ZooKeeper: Client
>> environment:java.version=1.8.0_72-internal
>> 16/02/04 11:32:08 INFO zookeeper.ZooKeeper: Client
>> environment:java.vendor=Oracle Corporation
>> 16/02/04 11:32:08 INFO zookeeper.ZooKeeper: Client
>> environment:java.home=/usr/lib/jvm/java-8-openjdk-amd64/jre
>> 16/02/04 11:32:08 INFO zookeeper.ZooKeeper: Client
>> environment:java.io.tmpdir=/tmp
>> 16/02/04 11:32:08 INFO zookeeper.ZooKeeper: Client
>> environment:java.compiler=<NA>
>> 16/02/04 11:32:08 INFO zookeeper.ZooKeeper: Client environment:os.name
>> <http://os.name> =Linux
>> 16/02/04 11:32:08 INFO zookeeper.ZooKeeper: Client environment:os.arch=amd64
>> 16/02/04 11:32:08 INFO zookeeper.ZooKeeper: Client
>> environment:os.version=3.13.0-63-generic
>> 16/02/04 11:32:08 INFO zookeeper.ZooKeeper: Client environment:user.name
>> <http://user.name> =marathon
>> 16/02/04 11:32:08 INFO zookeeper.ZooKeeper: Client
>> environment:user.home=/opt/marathon
>> 16/02/04 11:32:08 INFO zookeeper.ZooKeeper: Initiating client connection,
>> connectString=zk01:2181/mesos-kafka sessionTimeout=6000
>> watcher=org.I0Itec.zkclient.ZkClient@2e1b7b98
>> 16/02/04 11:32:08 INFO zookeeper.ClientCnxn: Opening socket connection to
>> server 10.100.6.251/10.100.6.251:2181 <http://10.100.6.251/10.100.6.251:2181>
>> . Will not attempt to authenticate using SASL (unknown error)
>> 16/02/04 11:32:08 INFO zookeeper.ClientCnxn: Socket connection established to
>> 10.100.6.251/10.100.6.251:2181 <http://10.100.6.251/10.100.6.251:2181> ,
>> initiating session
>> 16/02/04 11:32:08 INFO zookeeper.ClientCnxn: Session establishment complete
>> on server 10.100.6.251/10.100.6.251:2181
>> <http://10.100.6.251/10.100.6.251:2181> , sessionid = 0x152858b1cc07491,
>> negotiated timeout = 6000
>> 16/02/04 11:32:08 INFO zkclient.ZkClient: zookeeper state changed
>> (SyncConnected)
>> 16/02/04 11:32:09 INFO consumer.ZookeeperConsumerConnector:
>> [flume_mesos10-1454614328204-ca8a74df], begin registering consumer
>> flume_mesos10-1454614328204-ca8a74df in ZK
>> 16/02/04 11:32:09 INFO consumer.ZookeeperConsumerConnector:
>> [flume_mesos10-1454614328204-ca8a74df], end registering consumer
>> flume_mesos10-1454614328204-ca8a74df in ZK
>> 16/02/04 11:32:09 INFO consumer.ZookeeperConsumerConnector:
>> [flume_mesos10-1454614328204-ca8a74df], starting watcher executor thread for
>> consumer flume_mesos10-1454614328204-ca8a74df
>> 16/02/04 11:32:09 INFO consumer.ZookeeperConsumerConnector:
>> [flume_mesos10-1454614328204-ca8a74df], begin rebalancing consumer
>> flume_mesos10-1454614328204-ca8a74df try #0
>> 16/02/04 11:32:09 WARN consumer.ZookeeperConsumerConnector:
>> [flume_mesos10-1454614328204-ca8a74df], no brokers found when trying to
>> rebalance.
>> 16/02/04 11:32:09 INFO consumer.ZookeeperConsumerConnector:
>> [flume_mesos10-1454614328204-ca8a74df], end rebalancing consumer
>> flume_mesos10-1454614328204-ca8a74df try #0
>> 16/02/04 11:32:09 INFO kafka.KafkaSource: Kafka source kafka-source-test do
>> started.
>> 16/02/04 11:32:09 INFO instrumentation.MonitoredCounterGroup: Monitored
>> counter group for type: SOURCE, name: kafka-source-test: Successfully
>> registered new MBean.
>> 16/02/04 11:32:09 INFO instrumentation.MonitoredCounterGroup: Component type:
>> SOURCE, name: kafka-source-test started
>> --
>> 
>> --
>> Justin Alan Ryan
>> Sr. Systems / Release Engineer
>> ZipRealty
> 




Re: KafkaSource not picking up any messages

Posted by Justin Ryan <ju...@ziprealty.com>.
This was actually generating errors, the memory channel had been configured
with very low capacity thinking that would force it to flush more often or
something, and was fixed.

Now I¹m on to a challenge I think I understand just fine, or fine enough:
HDFS permissions.

Thanks, Gonzalo for input and for being a sounding board!

Justin

From:  Justin Ryan <ju...@ziprealty.com>
Date:  Monday, February 8, 2016 at 11:52 AM
To:  <us...@flume.apache.org>
Subject:  Re: KafkaSource not picking up any messages

So it looks like Flume is building against Kafka 2.10-0.8.1.1, and I¹m using
2.10-0.8.2.2, which kafka-mesos leans on.

Any likelihood of this being incompatible?

I did notice this morning that Flume has an ³EventReceivedCount² that
increments 1-2k at a time and is always a round number, but the
EventAcceptedCount is 0.

When I pipe dmesg to the console producer, and then try to read it with the
console consumer, I have to tell it to start from the beginning, so it seems
like flume may be reading events and moving the marker, but not doing
anything with them.

‹
{
   "CHANNEL.hdfs-channel-kafka" : {
      "EventTakeAttemptCount" : "117",
      "EventTakeSuccessCount" : "0",
      "StartTime" : "1454959470045",
      "ChannelFillPercentage" : "0.0",
      "ChannelSize" : "0",
      "EventPutSuccessCount" : "0",
      "Type" : "CHANNEL",
      "EventPutAttemptCount" : "1826",
      "ChannelCapacity" : "10",
      "StopTime" : "0"
   },
   "SOURCE.kafka-source-test" : {
      "StartTime" : "1454959471241",
      "OpenConnectionCount" : "0",
      "AppendBatchReceivedCount" : "0",
      "EventReceivedCount" : "166000",
      "KafkaCommitTimer" : "0",
      "AppendBatchAcceptedCount" : "0",
      "AppendReceivedCount" : "0",
      "StopTime" : "0",
      "KafkaEventGetTimer" : "17114",
      "KafkaEmptyCount" : "0",
      "EventAcceptedCount" : "0",
      "Type" : "SOURCE",
      "AppendAcceptedCount" : "0"
   },
   "SINK.hdfs-sink-kafka" : {
      "ConnectionFailedCount" : "0",
      "StartTime" : "1454959470054",
      "BatchCompleteCount" : "0",
      "ConnectionClosedCount" : "0",
      "EventDrainSuccessCount" : "0",
      "BatchEmptyCount" : "117",
      "BatchUnderflowCount" : "0",
      "Type" : "SINK",
      "ConnectionCreatedCount" : "0",
      "StopTime" : "0",
      "EventDrainAttemptCount" : "0"
   }
}
--

From:  Gonzalo Herreros <gh...@gmail.com>
Reply-To:  <us...@flume.apache.org>
Date:  Monday, February 8, 2016 at 12:29 AM
To:  user <us...@flume.apache.org>
Subject:  Re: KafkaSource not picking up any messages

The only thing I can think of is that the kafka client included in Kafka is
not compatible with the kafka version on the brokers (there's been a lot of
changes recently), but normally you get errors when that happens.

On 5 February 2016 at 20:02, Justin Ryan <ju...@ziprealty.com> wrote:
> Thanks, Gonzalo ­ that def helped!
> 
> This also ties into an issue I¹d raised with mesos-kafka where the zk path
> seemed to be ignored, and I now see that there is a node that stores the
> mesos-kafka scheduler config, and the kafka path must be specified separately,
> so is currently Œ/Œ.
> 
> Still not reading events, but definitely looks better in startup log:
> 
> 16/02/05 11:55:38 INFO kafka.KafkaSource: Kafka source kafka-source-test do
> started.
> 16/02/05 11:55:38 INFO consumer.ConsumerFetcherManager$LeaderFinderThread:
> [flume_mesos04-1454702137146-6cd63609-leader-finder-thread], Starting
> 16/02/05 11:55:38 INFO instrumentation.MonitoredCounterGroup: Monitored
> counter group for type: SOURCE, name: kafka-source-test: Successfully
> registered new MBean.
> 16/02/05 11:55:38 INFO instrumentation.MonitoredCounterGroup: Component type:
> SOURCE, name: kafka-source-test started
> 16/02/05 11:55:38 INFO utils.VerifiableProperties: Verifying properties
> 16/02/05 11:55:38 INFO utils.VerifiableProperties: Property client.id
> <http://client.id>  is overridden to flume
> 16/02/05 11:55:38 INFO utils.VerifiableProperties: Property
> metadata.broker.list is overridden to
> mesos01:31000,mesos02:31000,mesos08:31000
> 16/02/05 11:55:38 INFO utils.VerifiableProperties: Property request.timeout.ms
> <http://request.timeout.ms>  is overridden to 30000
> 16/02/05 11:55:38 INFO client.ClientUtils$: Fetching metadata from broker
> id:1,host:mesos02,port:31000 with correlation id 0 for 1 topic(s)
> Set(home_views)
> 16/02/05 11:55:38 INFO producer.SyncProducer: Connected to mesos02:31000 for
> producing
> 16/02/05 11:55:38 INFO producer.SyncProducer: Disconnecting from mesos02:31000
> 16/02/05 11:55:38 INFO consumer.ConsumerFetcherThread:
> [ConsumerFetcherThread-flume_mesos04-1454702137146-6cd63609-0-0], Starting
> 16/02/05 11:55:38 INFO consumer.ConsumerFetcherManager:
> [ConsumerFetcherManager-1454702137389] Added fetcher for partitions
> ArrayBuffer([[home_views,0], initOffset -1 to broker
> id:0,host:mesos01,port:31000] )
> 
> ‹
> 
> $ curl http://mesos04:34545/metrics | json_pp
>   % Total    % Received % Xferd  Average Speed   Time    Time     Time
> Current
>                                  Dload  Upload   Total   Spent    Left  Speed
> 100   925    0   925    0     0   7741      0 --:--:-- --:--:-- --:--:--  7773
> {
>    "CHANNEL.hdfs-channel-kafka" : {
>       "ChannelCapacity" : "10",
>       "StartTime" : "1454702136681",
>       "EventTakeSuccessCount" : "0",
>       "ChannelFillPercentage" : "0.0",
>       "EventPutAttemptCount" : "0",
>       "EventTakeAttemptCount" : "14",
>       "StopTime" : "0",
>       "ChannelSize" : "0",
>       "EventPutSuccessCount" : "0",
>       "Type" : "CHANNEL"
>    },
>    "SOURCE.kafka-source-test" : {
>       "AppendBatchReceivedCount" : "0",
>       "AppendAcceptedCount" : "0",
>       "KafkaEmptyCount" : "0",
>       "AppendReceivedCount" : "0",
>       "KafkaEventGetTimer" : "18046",
>       "EventAcceptedCount" : "0",
>       "StartTime" : "1454702138033",
>       "StopTime" : "0",
>       "KafkaCommitTimer" : "0",
>       "Type" : "SOURCE",
>       "AppendBatchAcceptedCount" : "0",
>       "EventReceivedCount" : "0",
>       "OpenConnectionCount" : "0"
>    },
>    "SINK.hdfs-sink-kafka" : {
>       "ConnectionCreatedCount" : "0",
>       "EventDrainAttemptCount" : "0",
>       "BatchCompleteCount" : "0",
>       "StartTime" : "1454702136714",
>       "Type" : "SINK",
>       "EventDrainSuccessCount" : "0",
>       "StopTime" : "0",
>       "BatchUnderflowCount" : "0",
>       "ConnectionFailedCount" : "0",
>       "BatchEmptyCount" : "13",
>       "ConnectionClosedCount" : "0"
>    }
> }
> 
> 
> From:  Gonzalo Herreros <gh...@gmail.com>
> Reply-To:  <us...@flume.apache.org>
> Date:  Thursday, February 4, 2016 at 11:15 PM
> To:  user <us...@flume.apache.org>
> Subject:  Re: KafkaSource not picking up any messages
> 
> I'm concerned with the warning "no brokers found when trying to rebalance"
> Double check that the path in zookeeper is correct zk01:2181/mesos-kafka and
> it's not the standard /kafka
> 
> When you connect with the kafka-console-consumer, do you specify /mesos-kafka
> or just zk01:2181?
> You can use the zkclient tool to check if there are brokers currently
> registered under that path for the topic "test"
> 
> Regards,
> Gonzalo
> 
> 
> On 4 February 2016 at 21:16, Justin Ryan <ju...@ziprealty.com> wrote:
>> Hiya folks,
>> 
>> I¹m setting up a new environment with Kafka, Flume, and HDFS, and have
>> implemented the simplest possible testing configuration I can come up with.
>> It logs successfully configuring and starting the KafkaSource, and with kafka
>> tools I can confirm that messages have been sent, but the JSON Metrics from
>> Flume show 0 messages processed.
>> 
>> Are there any more tools at my disposal to investigate? Any assistance would
>> be greatly appreciated!
>> 
>> My config and log:
>> 
>> ‹
>> # generated by Chef for mesos10, changes will be overwritten
>> 
>> flume1.sources=kafka-source-test
>> flume1.channels=hdfs-channel-kafka
>> flume1.sinks=hdfs-sink-kafka
>> flume1.sources.kafka-source-test.type=org.apache.flume.source.kafka.KafkaSour
>> ce
>> flume1.sources.kafka-source-test.zookeeperConnect=zk01:2181/mesos-kafka
>> flume1.sources.kafka-source-test.topic=test
>> flume1.sources.kafka-source-test.groupId=flume
>> flume1.sources.kafka-source-test.interceptors=i1
>> flume1.sources.kafka-source-test.interceptors.i1.type=timestamp
>> flume1.sources.kafka-source-test.consumer.timeout.ms
>> <http://flume1.sources.kafka-source-test.consumer.timeout.ms> =100
>> flume1.sources.kafka-source-test.channels=hdfs-channel-kafka
>> flume1.channels.hdfs-channel-kafka.type=memory
>> flume1.sinks.hdfs-sink-kafka.channel=hdfs-channel-kafka
>> flume1.sinks.hdfs-sink-kafka.type=hdfs
>> flume1.sinks.hdfs-sink-kafka.hdfs.path=/tmp/kafka/%{topic}/%y-%m-%d
>> flume1.sinks.hdfs-sink-kafka.hdfs.rollInterval=5
>> flume1.sinks.hdfs-sink-kafka.hdfs.rollCount=0
>> flume1.sinks.hdfs-sink-kafka.hdfs.rollSize=0
>> flume1.sinks.hdfs-sink-kafka.hdfs.fileType=DataStream
>> flume1.channels.hdfs-channel-kafka.capacity=10
>> flume1.channels.hdfs-channel-kafka.transactionCapacity=10
>> ‹
>> 
>> Startup log (less incredibly long path lines):
>> ‹
>> 16/02/04 11:32:07 INFO node.PollingPropertiesFileConfigurationProvider:
>> Configuration provider starting
>> 16/02/04 11:32:07 INFO node.PollingPropertiesFileConfigurationProvider:
>> Reloading configuration file:/etc/flume/conf.chef/flume.conf
>> 16/02/04 11:32:07 INFO conf.FlumeConfiguration: Processing:hdfs-sink-kafka
>> 16/02/04 11:32:07 INFO conf.FlumeConfiguration: Processing:hdfs-sink-kafka
>> 16/02/04 11:32:07 INFO conf.FlumeConfiguration: Processing:hdfs-sink-kafka
>> 16/02/04 11:32:07 INFO conf.FlumeConfiguration: Processing:hdfs-sink-kafka
>> 16/02/04 11:32:07 INFO conf.FlumeConfiguration: Added sinks: hdfs-sink-kafka
>> Agent: flume1
>> 16/02/04 11:32:07 INFO conf.FlumeConfiguration: Processing:hdfs-sink-kafka
>> 16/02/04 11:32:07 INFO conf.FlumeConfiguration: Processing:hdfs-sink-kafka
>> 16/02/04 11:32:07 INFO conf.FlumeConfiguration: Processing:hdfs-sink-kafka
>> 16/02/04 11:32:07 INFO conf.FlumeConfiguration: Post-validation flume
>> configuration contains configuration for agents: [flume1]
>> 16/02/04 11:32:07 INFO node.AbstractConfigurationProvider: Creating channels
>> 16/02/04 11:32:07 INFO channel.DefaultChannelFactory: Creating instance of
>> channel hdfs-channel-kafka type memory
>> 16/02/04 11:32:07 INFO node.AbstractConfigurationProvider: Created channel
>> hdfs-channel-kafka
>> 16/02/04 11:32:07 INFO source.DefaultSourceFactory: Creating instance of
>> source kafka-source-test, type org.apache.flume.source.kafka.KafkaSource
>> 16/02/04 11:32:07 INFO kafka.KafkaSourceUtil: context={
>> parameters:{interceptors.i1.type=timestamp,
>> zookeeperConnect=zk01:2181/mesos-kafka, channels=hdfs-channel-kafka,
>> groupId=flume, consumer.timeout.ms <http://consumer.timeout.ms> =100,
>> topic=test, type=org.apache.flume.source.kafka.KafkaSource, interceptors=i1}
>> }
>> 16/02/04 11:32:07 INFO sink.DefaultSinkFactory: Creating instance of sink:
>> hdfs-sink-kafka, type: hdfs
>> 16/02/04 11:32:07 INFO node.AbstractConfigurationProvider: Channel
>> hdfs-channel-kafka connected to [kafka-source-test, hdfs-sink-kafka]
>> 16/02/04 11:32:07 INFO node.Application: Starting new configuration:{
>> sourceRunners:{kafka-source-test=PollableSourceRunner: {
>> source:org.apache.flume.source.kafka.KafkaSource{name:kafka-source-test,state
>> :IDLE} counterGroup:{ name:null counters:{} } }}
>> sinkRunners:{hdfs-sink-kafka=SinkRunner: {
>> policy:org.apache.flume.sink.DefaultSinkProcessor@2f33f35e counterGroup:{
>> name:null counters:{} } }}
>> channels:{hdfs-channel-kafka=org.apache.flume.channel.MemoryChannel{name:
>> hdfs-channel-kafka}} }
>> 16/02/04 11:32:07 INFO node.Application: Starting Channel hdfs-channel-kafka
>> 16/02/04 11:32:07 INFO instrumentation.MonitoredCounterGroup: Monitored
>> counter group for type: CHANNEL, name: hdfs-channel-kafka: Successfully
>> registered new MBean.
>> 16/02/04 11:32:07 INFO instrumentation.MonitoredCounterGroup: Component type:
>> CHANNEL, name: hdfs-channel-kafka started
>> 16/02/04 11:32:07 INFO node.Application: Starting Sink hdfs-sink-kafka
>> 16/02/04 11:32:07 INFO node.Application: Starting Source kafka-source-test
>> 16/02/04 11:32:07 INFO kafka.KafkaSource: Starting
>> org.apache.flume.source.kafka.KafkaSource{name:kafka-source-test,state:IDLE}.
>> ..
>> 16/02/04 11:32:07 INFO instrumentation.MonitoredCounterGroup: Monitored
>> counter group for type: SINK, name: hdfs-sink-kafka: Successfully registered
>> new MBean.
>> 16/02/04 11:32:07 INFO instrumentation.MonitoredCounterGroup: Component type:
>> SINK, name: hdfs-sink-kafka started
>> 16/02/04 11:32:07 INFO mortbay.log: Logging to
>> org.slf4j.impl.Log4jLoggerAdapter(org.mortbay.log) via
>> org.mortbay.log.Slf4jLog
>> 16/02/04 11:32:07 INFO mortbay.log: jetty-6.1.26.cloudera.4
>> 16/02/04 11:32:07 INFO mortbay.log: Started
>> SelectChannelConnector@0.0.0.0:34545
>> <http://SelectChannelConnector@0.0.0.0:34545>
>> 16/02/04 11:32:08 INFO utils.VerifiableProperties: Verifying properties
>> 16/02/04 11:32:08 INFO utils.VerifiableProperties: Property
>> auto.commit.enable is overridden to false
>> 16/02/04 11:32:08 INFO utils.VerifiableProperties: Property
>> consumer.timeout.ms <http://consumer.timeout.ms>  is overridden to 10
>> 16/02/04 11:32:08 INFO utils.VerifiableProperties: Property group.id
>> <http://group.id>  is overridden to flume
>> 16/02/04 11:32:08 INFO utils.VerifiableProperties: Property zookeeper.connect
>> is overridden to zk01:2181/mesos-kafka
>> 16/02/04 11:32:08 INFO consumer.ZookeeperConsumerConnector:
>> [flume_mesos10-1454614328204-ca8a74df], Connecting to zookeeper instance at
>> zk01:2181/mesos-kafka
>> 16/02/04 11:32:08 INFO zkclient.ZkEventThread: Starting ZkClient event
>> thread.
>> 16/02/04 11:32:08 INFO zookeeper.ZooKeeper: Client
>> environment:zookeeper.version=3.4.5-946--1, built on 05/18/2015 19:03 GMT
>> 16/02/04 11:32:08 INFO zookeeper.ZooKeeper: Client environment:host.name
>> <http://host.name> =mesos10
>> 16/02/04 11:32:08 INFO zookeeper.ZooKeeper: Client
>> environment:java.version=1.8.0_72-internal
>> 16/02/04 11:32:08 INFO zookeeper.ZooKeeper: Client
>> environment:java.vendor=Oracle Corporation
>> 16/02/04 11:32:08 INFO zookeeper.ZooKeeper: Client
>> environment:java.home=/usr/lib/jvm/java-8-openjdk-amd64/jre
>> 16/02/04 11:32:08 INFO zookeeper.ZooKeeper: Client
>> environment:java.io.tmpdir=/tmp
>> 16/02/04 11:32:08 INFO zookeeper.ZooKeeper: Client
>> environment:java.compiler=<NA>
>> 16/02/04 11:32:08 INFO zookeeper.ZooKeeper: Client environment:os.name
>> <http://os.name> =Linux
>> 16/02/04 11:32:08 INFO zookeeper.ZooKeeper: Client environment:os.arch=amd64
>> 16/02/04 11:32:08 INFO zookeeper.ZooKeeper: Client
>> environment:os.version=3.13.0-63-generic
>> 16/02/04 11:32:08 INFO zookeeper.ZooKeeper: Client environment:user.name
>> <http://user.name> =marathon
>> 16/02/04 11:32:08 INFO zookeeper.ZooKeeper: Client
>> environment:user.home=/opt/marathon
>> 16/02/04 11:32:08 INFO zookeeper.ZooKeeper: Initiating client connection,
>> connectString=zk01:2181/mesos-kafka sessionTimeout=6000
>> watcher=org.I0Itec.zkclient.ZkClient@2e1b7b98
>> 16/02/04 11:32:08 INFO zookeeper.ClientCnxn: Opening socket connection to
>> server 10.100.6.251/10.100.6.251:2181 <http://10.100.6.251/10.100.6.251:2181>
>> . Will not attempt to authenticate using SASL (unknown error)
>> 16/02/04 11:32:08 INFO zookeeper.ClientCnxn: Socket connection established to
>> 10.100.6.251/10.100.6.251:2181 <http://10.100.6.251/10.100.6.251:2181> ,
>> initiating session
>> 16/02/04 11:32:08 INFO zookeeper.ClientCnxn: Session establishment complete
>> on server 10.100.6.251/10.100.6.251:2181
>> <http://10.100.6.251/10.100.6.251:2181> , sessionid = 0x152858b1cc07491,
>> negotiated timeout = 6000
>> 16/02/04 11:32:08 INFO zkclient.ZkClient: zookeeper state changed
>> (SyncConnected)
>> 16/02/04 11:32:09 INFO consumer.ZookeeperConsumerConnector:
>> [flume_mesos10-1454614328204-ca8a74df], begin registering consumer
>> flume_mesos10-1454614328204-ca8a74df in ZK
>> 16/02/04 11:32:09 INFO consumer.ZookeeperConsumerConnector:
>> [flume_mesos10-1454614328204-ca8a74df], end registering consumer
>> flume_mesos10-1454614328204-ca8a74df in ZK
>> 16/02/04 11:32:09 INFO consumer.ZookeeperConsumerConnector:
>> [flume_mesos10-1454614328204-ca8a74df], starting watcher executor thread for
>> consumer flume_mesos10-1454614328204-ca8a74df
>> 16/02/04 11:32:09 INFO consumer.ZookeeperConsumerConnector:
>> [flume_mesos10-1454614328204-ca8a74df], begin rebalancing consumer
>> flume_mesos10-1454614328204-ca8a74df try #0
>> 16/02/04 11:32:09 WARN consumer.ZookeeperConsumerConnector:
>> [flume_mesos10-1454614328204-ca8a74df], no brokers found when trying to
>> rebalance.
>> 16/02/04 11:32:09 INFO consumer.ZookeeperConsumerConnector:
>> [flume_mesos10-1454614328204-ca8a74df], end rebalancing consumer
>> flume_mesos10-1454614328204-ca8a74df try #0
>> 16/02/04 11:32:09 INFO kafka.KafkaSource: Kafka source kafka-source-test do
>> started.
>> 16/02/04 11:32:09 INFO instrumentation.MonitoredCounterGroup: Monitored
>> counter group for type: SOURCE, name: kafka-source-test: Successfully
>> registered new MBean.
>> 16/02/04 11:32:09 INFO instrumentation.MonitoredCounterGroup: Component type:
>> SOURCE, name: kafka-source-test started
>> --
>> 
>> --
>> Justin Alan Ryan
>> Sr. Systems / Release Engineer
>> ZipRealty
> 




Re: KafkaSource not picking up any messages

Posted by Gonzalo Herreros <gh...@gmail.com>.
The only thing I can think of is that the kafka client included in Kafka is
not compatible with the kafka version on the brokers (there's been a lot of
changes recently), but normally you get errors when that happens.

On 5 February 2016 at 20:02, Justin Ryan <ju...@ziprealty.com> wrote:

> Thanks, Gonzalo – that def helped!
>
> This also ties into an issue I’d raised with mesos-kafka where the zk path
> seemed to be ignored, and I now see that there is a node that stores the
> mesos-kafka scheduler config, and the kafka path must be specified
> separately, so is currently ‘/‘.
>
> Still not reading events, but definitely looks better in startup log:
>
> 16/02/05 11:55:38 INFO kafka.KafkaSource: Kafka source kafka-source-test
> do started.
> 16/02/05 11:55:38 INFO consumer.ConsumerFetcherManager$LeaderFinderThread:
> [flume_mesos04-1454702137146-6cd63609-leader-finder-thread], Starting
> 16/02/05 11:55:38 INFO instrumentation.MonitoredCounterGroup: Monitored
> counter group for type: SOURCE, name: kafka-source-test: Successfully
> registered new MBean.
> 16/02/05 11:55:38 INFO instrumentation.MonitoredCounterGroup: Component
> type: SOURCE, name: kafka-source-test started
> 16/02/05 11:55:38 INFO utils.VerifiableProperties: Verifying properties
> 16/02/05 11:55:38 INFO utils.VerifiableProperties: Property client.id is
> overridden to flume
> 16/02/05 11:55:38 INFO utils.VerifiableProperties: Property
> metadata.broker.list is overridden to
> mesos01:31000,mesos02:31000,mesos08:31000
> 16/02/05 11:55:38 INFO utils.VerifiableProperties: Property
> request.timeout.ms is overridden to 30000
> 16/02/05 11:55:38 INFO client.ClientUtils$: Fetching metadata from broker
> id:1,host:mesos02,port:31000 with correlation id 0 for 1 topic(s)
> Set(home_views)
> 16/02/05 11:55:38 INFO producer.SyncProducer: Connected to mesos02:31000
> for producing
> 16/02/05 11:55:38 INFO producer.SyncProducer: Disconnecting from
> mesos02:31000
> 16/02/05 11:55:38 INFO consumer.ConsumerFetcherThread:
> [ConsumerFetcherThread-flume_mesos04-1454702137146-6cd63609-0-0], Starting
> 16/02/05 11:55:38 INFO consumer.ConsumerFetcherManager:
> [ConsumerFetcherManager-1454702137389] Added fetcher for partitions
> ArrayBuffer([[home_views,0], initOffset -1 to broker
> id:0,host:mesos01,port:31000] )
>
> —
>
> $ curl http://mesos04:34545/metrics | json_pp
>   % Total    % Received % Xferd  Average Speed   Time    Time     Time
>  Current
>                                  Dload  Upload   Total   Spent    Left
>  Speed
> 100   925    0   925    0     0   7741      0 --:--:-- --:--:-- --:--:--
>  7773
> {
>    "CHANNEL.hdfs-channel-kafka" : {
>       "ChannelCapacity" : "10",
>       "StartTime" : "1454702136681",
>       "EventTakeSuccessCount" : "0",
>       "ChannelFillPercentage" : "0.0",
>       "EventPutAttemptCount" : "0",
>       "EventTakeAttemptCount" : "14",
>       "StopTime" : "0",
>       "ChannelSize" : "0",
>       "EventPutSuccessCount" : "0",
>       "Type" : "CHANNEL"
>    },
>    "SOURCE.kafka-source-test" : {
>       "AppendBatchReceivedCount" : "0",
>       "AppendAcceptedCount" : "0",
>       "KafkaEmptyCount" : "0",
>       "AppendReceivedCount" : "0",
>       "KafkaEventGetTimer" : "18046",
>       "EventAcceptedCount" : "0",
>       "StartTime" : "1454702138033",
>       "StopTime" : "0",
>       "KafkaCommitTimer" : "0",
>       "Type" : "SOURCE",
>       "AppendBatchAcceptedCount" : "0",
>       "EventReceivedCount" : "0",
>       "OpenConnectionCount" : "0"
>    },
>    "SINK.hdfs-sink-kafka" : {
>       "ConnectionCreatedCount" : "0",
>       "EventDrainAttemptCount" : "0",
>       "BatchCompleteCount" : "0",
>       "StartTime" : "1454702136714",
>       "Type" : "SINK",
>       "EventDrainSuccessCount" : "0",
>       "StopTime" : "0",
>       "BatchUnderflowCount" : "0",
>       "ConnectionFailedCount" : "0",
>       "BatchEmptyCount" : "13",
>       "ConnectionClosedCount" : "0"
>    }
> }
>
>
> From: Gonzalo Herreros <gh...@gmail.com>
> Reply-To: <us...@flume.apache.org>
> Date: Thursday, February 4, 2016 at 11:15 PM
> To: user <us...@flume.apache.org>
> Subject: Re: KafkaSource not picking up any messages
>
> I'm concerned with the warning "no brokers found when trying to rebalance"
> Double check that the path in zookeeper is correct zk01:2181/mesos-kafka
> and it's not the standard /kafka
>
> When you connect with the kafka-console-consumer, do you specify
> /mesos-kafka or just zk01:2181?
> You can use the zkclient tool to check if there are brokers currently
> registered under that path for the topic "test"
>
> Regards,
> Gonzalo
>
>
> On 4 February 2016 at 21:16, Justin Ryan <ju...@ziprealty.com> wrote:
>
>> Hiya folks,
>>
>> I’m setting up a new environment with Kafka, Flume, and HDFS, and have
>> implemented the simplest possible testing configuration I can come up
>> with.  It logs successfully configuring and starting the KafkaSource, and
>> with kafka tools I can confirm that messages have been sent, but the JSON
>> Metrics from Flume show 0 messages processed.
>>
>> Are there any more tools at my disposal to investigate? Any assistance
>> would be greatly appreciated!
>>
>> My config and log:
>>
>> —
>> # generated by Chef for mesos10, changes will be overwritten
>>
>> flume1.sources=kafka-source-test
>> flume1.channels=hdfs-channel-kafka
>> flume1.sinks=hdfs-sink-kafka
>>
>> flume1.sources.kafka-source-test.type=org.apache.flume.source.kafka.KafkaSource
>> flume1.sources.kafka-source-test.zookeeperConnect=zk01:2181/mesos-kafka
>> flume1.sources.kafka-source-test.topic=test
>> flume1.sources.kafka-source-test.groupId=flume
>> flume1.sources.kafka-source-test.interceptors=i1
>> flume1.sources.kafka-source-test.interceptors.i1.type=timestamp
>> flume1.sources.kafka-source-test.consumer.timeout.ms=100
>> flume1.sources.kafka-source-test.channels=hdfs-channel-kafka
>> flume1.channels.hdfs-channel-kafka.type=memory
>> flume1.sinks.hdfs-sink-kafka.channel=hdfs-channel-kafka
>> flume1.sinks.hdfs-sink-kafka.type=hdfs
>> flume1.sinks.hdfs-sink-kafka.hdfs.path=/tmp/kafka/%{topic}/%y-%m-%d
>> flume1.sinks.hdfs-sink-kafka.hdfs.rollInterval=5
>> flume1.sinks.hdfs-sink-kafka.hdfs.rollCount=0
>> flume1.sinks.hdfs-sink-kafka.hdfs.rollSize=0
>> flume1.sinks.hdfs-sink-kafka.hdfs.fileType=DataStream
>> flume1.channels.hdfs-channel-kafka.capacity=10
>> flume1.channels.hdfs-channel-kafka.transactionCapacity=10
>> —
>>
>> Startup log (less incredibly long path lines):
>> —
>> 16/02/04 11:32:07 INFO node.PollingPropertiesFileConfigurationProvider:
>> Configuration provider starting
>> 16/02/04 11:32:07 INFO node.PollingPropertiesFileConfigurationProvider:
>> Reloading configuration file:/etc/flume/conf.chef/flume.conf
>> 16/02/04 11:32:07 INFO conf.FlumeConfiguration: Processing:hdfs-sink-kafka
>> 16/02/04 11:32:07 INFO conf.FlumeConfiguration: Processing:hdfs-sink-kafka
>> 16/02/04 11:32:07 INFO conf.FlumeConfiguration: Processing:hdfs-sink-kafka
>> 16/02/04 11:32:07 INFO conf.FlumeConfiguration: Processing:hdfs-sink-kafka
>> 16/02/04 11:32:07 INFO conf.FlumeConfiguration: Added sinks:
>> hdfs-sink-kafka Agent: flume1
>> 16/02/04 11:32:07 INFO conf.FlumeConfiguration: Processing:hdfs-sink-kafka
>> 16/02/04 11:32:07 INFO conf.FlumeConfiguration: Processing:hdfs-sink-kafka
>> 16/02/04 11:32:07 INFO conf.FlumeConfiguration: Processing:hdfs-sink-kafka
>> 16/02/04 11:32:07 INFO conf.FlumeConfiguration: Post-validation flume
>> configuration contains configuration for agents: [flume1]
>> 16/02/04 11:32:07 INFO node.AbstractConfigurationProvider: Creating
>> channels
>> 16/02/04 11:32:07 INFO channel.DefaultChannelFactory: Creating instance
>> of channel hdfs-channel-kafka type memory
>> 16/02/04 11:32:07 INFO node.AbstractConfigurationProvider: Created
>> channel hdfs-channel-kafka
>> 16/02/04 11:32:07 INFO source.DefaultSourceFactory: Creating instance of
>> source kafka-source-test, type org.apache.flume.source.kafka.KafkaSource
>> 16/02/04 11:32:07 INFO kafka.KafkaSourceUtil: context={
>> parameters:{interceptors.i1.type=timestamp,
>> zookeeperConnect=zk01:2181/mesos-kafka, channels=hdfs-channel-kafka,
>> groupId=flume, consumer.timeout.ms=100, topic=test,
>> type=org.apache.flume.source.kafka.KafkaSource, interceptors=i1} }
>> 16/02/04 11:32:07 INFO sink.DefaultSinkFactory: Creating instance of
>> sink: hdfs-sink-kafka, type: hdfs
>> 16/02/04 11:32:07 INFO node.AbstractConfigurationProvider: Channel
>> hdfs-channel-kafka connected to [kafka-source-test, hdfs-sink-kafka]
>> 16/02/04 11:32:07 INFO node.Application: Starting new configuration:{
>> sourceRunners:{kafka-source-test=PollableSourceRunner: {
>> source:org.apache.flume.source.kafka.KafkaSource{name:kafka-source-test,state:IDLE}
>> counterGroup:{ name:null counters:{} } }}
>> sinkRunners:{hdfs-sink-kafka=SinkRunner: {
>> policy:org.apache.flume.sink.DefaultSinkProcessor@2f33f35e
>> counterGroup:{ name:null counters:{} } }}
>> channels:{hdfs-channel-kafka=org.apache.flume.channel.MemoryChannel{name:
>> hdfs-channel-kafka}} }
>> 16/02/04 11:32:07 INFO node.Application: Starting Channel
>> hdfs-channel-kafka
>> 16/02/04 11:32:07 INFO instrumentation.MonitoredCounterGroup: Monitored
>> counter group for type: CHANNEL, name: hdfs-channel-kafka: Successfully
>> registered new MBean.
>> 16/02/04 11:32:07 INFO instrumentation.MonitoredCounterGroup: Component
>> type: CHANNEL, name: hdfs-channel-kafka started
>> 16/02/04 11:32:07 INFO node.Application: Starting Sink hdfs-sink-kafka
>> 16/02/04 11:32:07 INFO node.Application: Starting Source kafka-source-test
>> 16/02/04 11:32:07 INFO kafka.KafkaSource: Starting
>> org.apache.flume.source.kafka.KafkaSource{name:kafka-source-test,state:IDLE}...
>> 16/02/04 11:32:07 INFO instrumentation.MonitoredCounterGroup: Monitored
>> counter group for type: SINK, name: hdfs-sink-kafka: Successfully
>> registered new MBean.
>> 16/02/04 11:32:07 INFO instrumentation.MonitoredCounterGroup: Component
>> type: SINK, name: hdfs-sink-kafka started
>> 16/02/04 11:32:07 INFO mortbay.log: Logging to
>> org.slf4j.impl.Log4jLoggerAdapter(org.mortbay.log) via
>> org.mortbay.log.Slf4jLog
>> 16/02/04 11:32:07 INFO mortbay.log: jetty-6.1.26.cloudera.4
>> 16/02/04 11:32:07 INFO mortbay.log: Started
>> SelectChannelConnector@0.0.0.0:34545
>> 16/02/04 11:32:08 INFO utils.VerifiableProperties: Verifying properties
>> 16/02/04 11:32:08 INFO utils.VerifiableProperties: Property
>> auto.commit.enable is overridden to false
>> 16/02/04 11:32:08 INFO utils.VerifiableProperties: Property
>> consumer.timeout.ms is overridden to 10
>> 16/02/04 11:32:08 INFO utils.VerifiableProperties: Property group.id is
>> overridden to flume
>> 16/02/04 11:32:08 INFO utils.VerifiableProperties: Property
>> zookeeper.connect is overridden to zk01:2181/mesos-kafka
>> 16/02/04 11:32:08 INFO consumer.ZookeeperConsumerConnector:
>> [flume_mesos10-1454614328204-ca8a74df], Connecting to zookeeper instance at
>> zk01:2181/mesos-kafka
>> 16/02/04 11:32:08 INFO zkclient.ZkEventThread: Starting ZkClient event
>> thread.
>> 16/02/04 11:32:08 INFO zookeeper.ZooKeeper: Client
>> environment:zookeeper.version=3.4.5-946--1, built on 05/18/2015 19:03 GMT
>> 16/02/04 11:32:08 INFO zookeeper.ZooKeeper: Client environment:host.name
>> =mesos10
>> 16/02/04 11:32:08 INFO zookeeper.ZooKeeper: Client
>> environment:java.version=1.8.0_72-internal
>> 16/02/04 11:32:08 INFO zookeeper.ZooKeeper: Client
>> environment:java.vendor=Oracle Corporation
>> 16/02/04 11:32:08 INFO zookeeper.ZooKeeper: Client
>> environment:java.home=/usr/lib/jvm/java-8-openjdk-amd64/jre
>> 16/02/04 11:32:08 INFO zookeeper.ZooKeeper: Client
>> environment:java.io.tmpdir=/tmp
>> 16/02/04 11:32:08 INFO zookeeper.ZooKeeper: Client
>> environment:java.compiler=<NA>
>> 16/02/04 11:32:08 INFO zookeeper.ZooKeeper: Client environment:os.name
>> =Linux
>> 16/02/04 11:32:08 INFO zookeeper.ZooKeeper: Client
>> environment:os.arch=amd64
>> 16/02/04 11:32:08 INFO zookeeper.ZooKeeper: Client
>> environment:os.version=3.13.0-63-generic
>> 16/02/04 11:32:08 INFO zookeeper.ZooKeeper: Client environment:user.name
>> =marathon
>> 16/02/04 11:32:08 INFO zookeeper.ZooKeeper: Client
>> environment:user.home=/opt/marathon
>> 16/02/04 11:32:08 INFO zookeeper.ZooKeeper: Initiating client connection,
>> connectString=zk01:2181/mesos-kafka sessionTimeout=6000
>> watcher=org.I0Itec.zkclient.ZkClient@2e1b7b98
>> 16/02/04 11:32:08 INFO zookeeper.ClientCnxn: Opening socket connection to
>> server 10.100.6.251/10.100.6.251:2181. Will not attempt to authenticate
>> using SASL (unknown error)
>> 16/02/04 11:32:08 INFO zookeeper.ClientCnxn: Socket connection
>> established to 10.100.6.251/10.100.6.251:2181, initiating session
>> 16/02/04 11:32:08 INFO zookeeper.ClientCnxn: Session establishment
>> complete on server 10.100.6.251/10.100.6.251:2181, sessionid =
>> 0x152858b1cc07491, negotiated timeout = 6000
>> 16/02/04 11:32:08 INFO zkclient.ZkClient: zookeeper state changed
>> (SyncConnected)
>> 16/02/04 11:32:09 INFO consumer.ZookeeperConsumerConnector:
>> [flume_mesos10-1454614328204-ca8a74df], begin registering consumer
>> flume_mesos10-1454614328204-ca8a74df in ZK
>> 16/02/04 11:32:09 INFO consumer.ZookeeperConsumerConnector:
>> [flume_mesos10-1454614328204-ca8a74df], end registering consumer
>> flume_mesos10-1454614328204-ca8a74df in ZK
>> 16/02/04 11:32:09 INFO consumer.ZookeeperConsumerConnector:
>> [flume_mesos10-1454614328204-ca8a74df], starting watcher executor thread
>> for consumer flume_mesos10-1454614328204-ca8a74df
>> 16/02/04 11:32:09 INFO consumer.ZookeeperConsumerConnector:
>> [flume_mesos10-1454614328204-ca8a74df], begin rebalancing consumer
>> flume_mesos10-1454614328204-ca8a74df try #0
>> 16/02/04 11:32:09 WARN consumer.ZookeeperConsumerConnector:
>> [flume_mesos10-1454614328204-ca8a74df], no brokers found when trying to
>> rebalance.
>> 16/02/04 11:32:09 INFO consumer.ZookeeperConsumerConnector:
>> [flume_mesos10-1454614328204-ca8a74df], end rebalancing consumer
>> flume_mesos10-1454614328204-ca8a74df try #0
>> 16/02/04 11:32:09 INFO kafka.KafkaSource: Kafka source kafka-source-test
>> do started.
>> 16/02/04 11:32:09 INFO instrumentation.MonitoredCounterGroup: Monitored
>> counter group for type: SOURCE, name: kafka-source-test: Successfully
>> registered new MBean.
>> 16/02/04 11:32:09 INFO instrumentation.MonitoredCounterGroup: Component
>> type: SOURCE, name: kafka-source-test started
>> --
>>
>> --
>> Justin Alan Ryan
>> Sr. Systems / Release Engineer
>> ZipRealty
>>
>
>

Re: KafkaSource not picking up any messages

Posted by Justin Ryan <ju...@ziprealty.com>.
Thanks, Gonzalo ­ that def helped!

This also ties into an issue I¹d raised with mesos-kafka where the zk path
seemed to be ignored, and I now see that there is a node that stores the
mesos-kafka scheduler config, and the kafka path must be specified
separately, so is currently Œ/Œ.

Still not reading events, but definitely looks better in startup log:

16/02/05 11:55:38 INFO kafka.KafkaSource: Kafka source kafka-source-test do
started.
16/02/05 11:55:38 INFO consumer.ConsumerFetcherManager$LeaderFinderThread:
[flume_mesos04-1454702137146-6cd63609-leader-finder-thread], Starting
16/02/05 11:55:38 INFO instrumentation.MonitoredCounterGroup: Monitored
counter group for type: SOURCE, name: kafka-source-test: Successfully
registered new MBean.
16/02/05 11:55:38 INFO instrumentation.MonitoredCounterGroup: Component
type: SOURCE, name: kafka-source-test started
16/02/05 11:55:38 INFO utils.VerifiableProperties: Verifying properties
16/02/05 11:55:38 INFO utils.VerifiableProperties: Property client.id is
overridden to flume
16/02/05 11:55:38 INFO utils.VerifiableProperties: Property
metadata.broker.list is overridden to
mesos01:31000,mesos02:31000,mesos08:31000
16/02/05 11:55:38 INFO utils.VerifiableProperties: Property
request.timeout.ms is overridden to 30000
16/02/05 11:55:38 INFO client.ClientUtils$: Fetching metadata from broker
id:1,host:mesos02,port:31000 with correlation id 0 for 1 topic(s)
Set(home_views)
16/02/05 11:55:38 INFO producer.SyncProducer: Connected to mesos02:31000 for
producing
16/02/05 11:55:38 INFO producer.SyncProducer: Disconnecting from
mesos02:31000
16/02/05 11:55:38 INFO consumer.ConsumerFetcherThread:
[ConsumerFetcherThread-flume_mesos04-1454702137146-6cd63609-0-0], Starting
16/02/05 11:55:38 INFO consumer.ConsumerFetcherManager:
[ConsumerFetcherManager-1454702137389] Added fetcher for partitions
ArrayBuffer([[home_views,0], initOffset -1 to broker
id:0,host:mesos01,port:31000] )

‹

$ curl http://mesos04:34545/metrics | json_pp
  % Total    % Received % Xferd  Average Speed   Time    Time     Time
Current
                                 Dload  Upload   Total   Spent    Left
Speed
100   925    0   925    0     0   7741      0 --:--:-- --:--:-- --:--:--
7773
{
   "CHANNEL.hdfs-channel-kafka" : {
      "ChannelCapacity" : "10",
      "StartTime" : "1454702136681",
      "EventTakeSuccessCount" : "0",
      "ChannelFillPercentage" : "0.0",
      "EventPutAttemptCount" : "0",
      "EventTakeAttemptCount" : "14",
      "StopTime" : "0",
      "ChannelSize" : "0",
      "EventPutSuccessCount" : "0",
      "Type" : "CHANNEL"
   },
   "SOURCE.kafka-source-test" : {
      "AppendBatchReceivedCount" : "0",
      "AppendAcceptedCount" : "0",
      "KafkaEmptyCount" : "0",
      "AppendReceivedCount" : "0",
      "KafkaEventGetTimer" : "18046",
      "EventAcceptedCount" : "0",
      "StartTime" : "1454702138033",
      "StopTime" : "0",
      "KafkaCommitTimer" : "0",
      "Type" : "SOURCE",
      "AppendBatchAcceptedCount" : "0",
      "EventReceivedCount" : "0",
      "OpenConnectionCount" : "0"
   },
   "SINK.hdfs-sink-kafka" : {
      "ConnectionCreatedCount" : "0",
      "EventDrainAttemptCount" : "0",
      "BatchCompleteCount" : "0",
      "StartTime" : "1454702136714",
      "Type" : "SINK",
      "EventDrainSuccessCount" : "0",
      "StopTime" : "0",
      "BatchUnderflowCount" : "0",
      "ConnectionFailedCount" : "0",
      "BatchEmptyCount" : "13",
      "ConnectionClosedCount" : "0"
   }
}


From:  Gonzalo Herreros <gh...@gmail.com>
Reply-To:  <us...@flume.apache.org>
Date:  Thursday, February 4, 2016 at 11:15 PM
To:  user <us...@flume.apache.org>
Subject:  Re: KafkaSource not picking up any messages

I'm concerned with the warning "no brokers found when trying to rebalance"
Double check that the path in zookeeper is correct zk01:2181/mesos-kafka and
it's not the standard /kafka

When you connect with the kafka-console-consumer, do you specify
/mesos-kafka or just zk01:2181?
You can use the zkclient tool to check if there are brokers currently
registered under that path for the topic "test"

Regards,
Gonzalo


On 4 February 2016 at 21:16, Justin Ryan <ju...@ziprealty.com> wrote:
> Hiya folks,
> 
> I¹m setting up a new environment with Kafka, Flume, and HDFS, and have
> implemented the simplest possible testing configuration I can come up with.
> It logs successfully configuring and starting the KafkaSource, and with kafka
> tools I can confirm that messages have been sent, but the JSON Metrics from
> Flume show 0 messages processed.
> 
> Are there any more tools at my disposal to investigate? Any assistance would
> be greatly appreciated!
> 
> My config and log:
> 
> ‹
> # generated by Chef for mesos10, changes will be overwritten
> 
> flume1.sources=kafka-source-test
> flume1.channels=hdfs-channel-kafka
> flume1.sinks=hdfs-sink-kafka
> 
flume1.sources.kafka-source-test.type=org.apache.flume.source.kafka.KafkaSourc>
e
> flume1.sources.kafka-source-test.zookeeperConnect=zk01:2181/mesos-kafka
> flume1.sources.kafka-source-test.topic=test
> flume1.sources.kafka-source-test.groupId=flume
> flume1.sources.kafka-source-test.interceptors=i1
> flume1.sources.kafka-source-test.interceptors.i1.type=timestamp
> flume1.sources.kafka-source-test.consumer.timeout.ms
> <http://flume1.sources.kafka-source-test.consumer.timeout.ms> =100
> flume1.sources.kafka-source-test.channels=hdfs-channel-kafka
> flume1.channels.hdfs-channel-kafka.type=memory
> flume1.sinks.hdfs-sink-kafka.channel=hdfs-channel-kafka
> flume1.sinks.hdfs-sink-kafka.type=hdfs
> flume1.sinks.hdfs-sink-kafka.hdfs.path=/tmp/kafka/%{topic}/%y-%m-%d
> flume1.sinks.hdfs-sink-kafka.hdfs.rollInterval=5
> flume1.sinks.hdfs-sink-kafka.hdfs.rollCount=0
> flume1.sinks.hdfs-sink-kafka.hdfs.rollSize=0
> flume1.sinks.hdfs-sink-kafka.hdfs.fileType=DataStream
> flume1.channels.hdfs-channel-kafka.capacity=10
> flume1.channels.hdfs-channel-kafka.transactionCapacity=10
> ‹
> 
> Startup log (less incredibly long path lines):
> ‹
> 16/02/04 11:32:07 INFO node.PollingPropertiesFileConfigurationProvider:
> Configuration provider starting
> 16/02/04 11:32:07 INFO node.PollingPropertiesFileConfigurationProvider:
> Reloading configuration file:/etc/flume/conf.chef/flume.conf
> 16/02/04 11:32:07 INFO conf.FlumeConfiguration: Processing:hdfs-sink-kafka
> 16/02/04 11:32:07 INFO conf.FlumeConfiguration: Processing:hdfs-sink-kafka
> 16/02/04 11:32:07 INFO conf.FlumeConfiguration: Processing:hdfs-sink-kafka
> 16/02/04 11:32:07 INFO conf.FlumeConfiguration: Processing:hdfs-sink-kafka
> 16/02/04 11:32:07 INFO conf.FlumeConfiguration: Added sinks: hdfs-sink-kafka
> Agent: flume1
> 16/02/04 11:32:07 INFO conf.FlumeConfiguration: Processing:hdfs-sink-kafka
> 16/02/04 11:32:07 INFO conf.FlumeConfiguration: Processing:hdfs-sink-kafka
> 16/02/04 11:32:07 INFO conf.FlumeConfiguration: Processing:hdfs-sink-kafka
> 16/02/04 11:32:07 INFO conf.FlumeConfiguration: Post-validation flume
> configuration contains configuration for agents: [flume1]
> 16/02/04 11:32:07 INFO node.AbstractConfigurationProvider: Creating channels
> 16/02/04 11:32:07 INFO channel.DefaultChannelFactory: Creating instance of
> channel hdfs-channel-kafka type memory
> 16/02/04 11:32:07 INFO node.AbstractConfigurationProvider: Created channel
> hdfs-channel-kafka
> 16/02/04 11:32:07 INFO source.DefaultSourceFactory: Creating instance of
> source kafka-source-test, type org.apache.flume.source.kafka.KafkaSource
> 16/02/04 11:32:07 INFO kafka.KafkaSourceUtil: context={
> parameters:{interceptors.i1.type=timestamp,
> zookeeperConnect=zk01:2181/mesos-kafka, channels=hdfs-channel-kafka,
> groupId=flume, consumer.timeout.ms <http://consumer.timeout.ms> =100,
> topic=test, type=org.apache.flume.source.kafka.KafkaSource, interceptors=i1} }
> 16/02/04 11:32:07 INFO sink.DefaultSinkFactory: Creating instance of sink:
> hdfs-sink-kafka, type: hdfs
> 16/02/04 11:32:07 INFO node.AbstractConfigurationProvider: Channel
> hdfs-channel-kafka connected to [kafka-source-test, hdfs-sink-kafka]
> 16/02/04 11:32:07 INFO node.Application: Starting new configuration:{
> sourceRunners:{kafka-source-test=PollableSourceRunner: {
> source:org.apache.flume.source.kafka.KafkaSource{name:kafka-source-test,state:
> IDLE} counterGroup:{ name:null counters:{} } }}
> sinkRunners:{hdfs-sink-kafka=SinkRunner: {
> policy:org.apache.flume.sink.DefaultSinkProcessor@2f33f35e counterGroup:{
> name:null counters:{} } }}
> channels:{hdfs-channel-kafka=org.apache.flume.channel.MemoryChannel{name:
> hdfs-channel-kafka}} }
> 16/02/04 11:32:07 INFO node.Application: Starting Channel hdfs-channel-kafka
> 16/02/04 11:32:07 INFO instrumentation.MonitoredCounterGroup: Monitored
> counter group for type: CHANNEL, name: hdfs-channel-kafka: Successfully
> registered new MBean.
> 16/02/04 11:32:07 INFO instrumentation.MonitoredCounterGroup: Component type:
> CHANNEL, name: hdfs-channel-kafka started
> 16/02/04 11:32:07 INFO node.Application: Starting Sink hdfs-sink-kafka
> 16/02/04 11:32:07 INFO node.Application: Starting Source kafka-source-test
> 16/02/04 11:32:07 INFO kafka.KafkaSource: Starting
> 
org.apache.flume.source.kafka.KafkaSource{name:kafka-source-test,state:IDLE}..>
.
> 16/02/04 11:32:07 INFO instrumentation.MonitoredCounterGroup: Monitored
> counter group for type: SINK, name: hdfs-sink-kafka: Successfully registered
> new MBean.
> 16/02/04 11:32:07 INFO instrumentation.MonitoredCounterGroup: Component type:
> SINK, name: hdfs-sink-kafka started
> 16/02/04 11:32:07 INFO mortbay.log: Logging to
> org.slf4j.impl.Log4jLoggerAdapter(org.mortbay.log) via
> org.mortbay.log.Slf4jLog
> 16/02/04 11:32:07 INFO mortbay.log: jetty-6.1.26.cloudera.4
> 16/02/04 11:32:07 INFO mortbay.log: Started
> SelectChannelConnector@0.0.0.0:34545
> <http://SelectChannelConnector@0.0.0.0:34545>
> 16/02/04 11:32:08 INFO utils.VerifiableProperties: Verifying properties
> 16/02/04 11:32:08 INFO utils.VerifiableProperties: Property auto.commit.enable
> is overridden to false
> 16/02/04 11:32:08 INFO utils.VerifiableProperties: Property
> consumer.timeout.ms <http://consumer.timeout.ms>  is overridden to 10
> 16/02/04 11:32:08 INFO utils.VerifiableProperties: Property group.id
> <http://group.id>  is overridden to flume
> 16/02/04 11:32:08 INFO utils.VerifiableProperties: Property zookeeper.connect
> is overridden to zk01:2181/mesos-kafka
> 16/02/04 11:32:08 INFO consumer.ZookeeperConsumerConnector:
> [flume_mesos10-1454614328204-ca8a74df], Connecting to zookeeper instance at
> zk01:2181/mesos-kafka
> 16/02/04 11:32:08 INFO zkclient.ZkEventThread: Starting ZkClient event thread.
> 16/02/04 11:32:08 INFO zookeeper.ZooKeeper: Client
> environment:zookeeper.version=3.4.5-946--1, built on 05/18/2015 19:03 GMT
> 16/02/04 11:32:08 INFO zookeeper.ZooKeeper: Client environment:host.name
> <http://host.name> =mesos10
> 16/02/04 11:32:08 INFO zookeeper.ZooKeeper: Client
> environment:java.version=1.8.0_72-internal
> 16/02/04 11:32:08 INFO zookeeper.ZooKeeper: Client
> environment:java.vendor=Oracle Corporation
> 16/02/04 11:32:08 INFO zookeeper.ZooKeeper: Client
> environment:java.home=/usr/lib/jvm/java-8-openjdk-amd64/jre
> 16/02/04 11:32:08 INFO zookeeper.ZooKeeper: Client
> environment:java.io.tmpdir=/tmp
> 16/02/04 11:32:08 INFO zookeeper.ZooKeeper: Client
> environment:java.compiler=<NA>
> 16/02/04 11:32:08 INFO zookeeper.ZooKeeper: Client environment:os.name
> <http://os.name> =Linux
> 16/02/04 11:32:08 INFO zookeeper.ZooKeeper: Client environment:os.arch=amd64
> 16/02/04 11:32:08 INFO zookeeper.ZooKeeper: Client
> environment:os.version=3.13.0-63-generic
> 16/02/04 11:32:08 INFO zookeeper.ZooKeeper: Client environment:user.name
> <http://user.name> =marathon
> 16/02/04 11:32:08 INFO zookeeper.ZooKeeper: Client
> environment:user.home=/opt/marathon
> 16/02/04 11:32:08 INFO zookeeper.ZooKeeper: Initiating client connection,
> connectString=zk01:2181/mesos-kafka sessionTimeout=6000
> watcher=org.I0Itec.zkclient.ZkClient@2e1b7b98
> 16/02/04 11:32:08 INFO zookeeper.ClientCnxn: Opening socket connection to
> server 10.100.6.251/10.100.6.251:2181 <http://10.100.6.251/10.100.6.251:2181>
> . Will not attempt to authenticate using SASL (unknown error)
> 16/02/04 11:32:08 INFO zookeeper.ClientCnxn: Socket connection established to
> 10.100.6.251/10.100.6.251:2181 <http://10.100.6.251/10.100.6.251:2181> ,
> initiating session
> 16/02/04 11:32:08 INFO zookeeper.ClientCnxn: Session establishment complete on
> server 10.100.6.251/10.100.6.251:2181 <http://10.100.6.251/10.100.6.251:2181>
> , sessionid = 0x152858b1cc07491, negotiated timeout = 6000
> 16/02/04 11:32:08 INFO zkclient.ZkClient: zookeeper state changed
> (SyncConnected)
> 16/02/04 11:32:09 INFO consumer.ZookeeperConsumerConnector:
> [flume_mesos10-1454614328204-ca8a74df], begin registering consumer
> flume_mesos10-1454614328204-ca8a74df in ZK
> 16/02/04 11:32:09 INFO consumer.ZookeeperConsumerConnector:
> [flume_mesos10-1454614328204-ca8a74df], end registering consumer
> flume_mesos10-1454614328204-ca8a74df in ZK
> 16/02/04 11:32:09 INFO consumer.ZookeeperConsumerConnector:
> [flume_mesos10-1454614328204-ca8a74df], starting watcher executor thread for
> consumer flume_mesos10-1454614328204-ca8a74df
> 16/02/04 11:32:09 INFO consumer.ZookeeperConsumerConnector:
> [flume_mesos10-1454614328204-ca8a74df], begin rebalancing consumer
> flume_mesos10-1454614328204-ca8a74df try #0
> 16/02/04 11:32:09 WARN consumer.ZookeeperConsumerConnector:
> [flume_mesos10-1454614328204-ca8a74df], no brokers found when trying to
> rebalance.
> 16/02/04 11:32:09 INFO consumer.ZookeeperConsumerConnector:
> [flume_mesos10-1454614328204-ca8a74df], end rebalancing consumer
> flume_mesos10-1454614328204-ca8a74df try #0
> 16/02/04 11:32:09 INFO kafka.KafkaSource: Kafka source kafka-source-test do
> started.
> 16/02/04 11:32:09 INFO instrumentation.MonitoredCounterGroup: Monitored
> counter group for type: SOURCE, name: kafka-source-test: Successfully
> registered new MBean.
> 16/02/04 11:32:09 INFO instrumentation.MonitoredCounterGroup: Component type:
> SOURCE, name: kafka-source-test started
> --
> 
> --
> Justin Alan Ryan
> Sr. Systems / Release Engineer
> ZipRealty




Re: KafkaSource not picking up any messages

Posted by Gonzalo Herreros <gh...@gmail.com>.
I'm concerned with the warning "no brokers found when trying to rebalance"
Double check that the path in zookeeper is correct zk01:2181/mesos-kafka
and it's not the standard /kafka

When you connect with the kafka-console-consumer, do you specify
/mesos-kafka or just zk01:2181?
You can use the zkclient tool to check if there are brokers currently
registered under that path for the topic "test"

Regards,
Gonzalo


On 4 February 2016 at 21:16, Justin Ryan <ju...@ziprealty.com> wrote:

> Hiya folks,
>
> I’m setting up a new environment with Kafka, Flume, and HDFS, and have
> implemented the simplest possible testing configuration I can come up
> with.  It logs successfully configuring and starting the KafkaSource, and
> with kafka tools I can confirm that messages have been sent, but the JSON
> Metrics from Flume show 0 messages processed.
>
> Are there any more tools at my disposal to investigate? Any assistance
> would be greatly appreciated!
>
> My config and log:
>
> —
> # generated by Chef for mesos10, changes will be overwritten
>
> flume1.sources=kafka-source-test
> flume1.channels=hdfs-channel-kafka
> flume1.sinks=hdfs-sink-kafka
>
> flume1.sources.kafka-source-test.type=org.apache.flume.source.kafka.KafkaSource
> flume1.sources.kafka-source-test.zookeeperConnect=zk01:2181/mesos-kafka
> flume1.sources.kafka-source-test.topic=test
> flume1.sources.kafka-source-test.groupId=flume
> flume1.sources.kafka-source-test.interceptors=i1
> flume1.sources.kafka-source-test.interceptors.i1.type=timestamp
> flume1.sources.kafka-source-test.consumer.timeout.ms=100
> flume1.sources.kafka-source-test.channels=hdfs-channel-kafka
> flume1.channels.hdfs-channel-kafka.type=memory
> flume1.sinks.hdfs-sink-kafka.channel=hdfs-channel-kafka
> flume1.sinks.hdfs-sink-kafka.type=hdfs
> flume1.sinks.hdfs-sink-kafka.hdfs.path=/tmp/kafka/%{topic}/%y-%m-%d
> flume1.sinks.hdfs-sink-kafka.hdfs.rollInterval=5
> flume1.sinks.hdfs-sink-kafka.hdfs.rollCount=0
> flume1.sinks.hdfs-sink-kafka.hdfs.rollSize=0
> flume1.sinks.hdfs-sink-kafka.hdfs.fileType=DataStream
> flume1.channels.hdfs-channel-kafka.capacity=10
> flume1.channels.hdfs-channel-kafka.transactionCapacity=10
> —
>
> Startup log (less incredibly long path lines):
> —
> 16/02/04 11:32:07 INFO node.PollingPropertiesFileConfigurationProvider:
> Configuration provider starting
> 16/02/04 11:32:07 INFO node.PollingPropertiesFileConfigurationProvider:
> Reloading configuration file:/etc/flume/conf.chef/flume.conf
> 16/02/04 11:32:07 INFO conf.FlumeConfiguration: Processing:hdfs-sink-kafka
> 16/02/04 11:32:07 INFO conf.FlumeConfiguration: Processing:hdfs-sink-kafka
> 16/02/04 11:32:07 INFO conf.FlumeConfiguration: Processing:hdfs-sink-kafka
> 16/02/04 11:32:07 INFO conf.FlumeConfiguration: Processing:hdfs-sink-kafka
> 16/02/04 11:32:07 INFO conf.FlumeConfiguration: Added sinks:
> hdfs-sink-kafka Agent: flume1
> 16/02/04 11:32:07 INFO conf.FlumeConfiguration: Processing:hdfs-sink-kafka
> 16/02/04 11:32:07 INFO conf.FlumeConfiguration: Processing:hdfs-sink-kafka
> 16/02/04 11:32:07 INFO conf.FlumeConfiguration: Processing:hdfs-sink-kafka
> 16/02/04 11:32:07 INFO conf.FlumeConfiguration: Post-validation flume
> configuration contains configuration for agents: [flume1]
> 16/02/04 11:32:07 INFO node.AbstractConfigurationProvider: Creating
> channels
> 16/02/04 11:32:07 INFO channel.DefaultChannelFactory: Creating instance of
> channel hdfs-channel-kafka type memory
> 16/02/04 11:32:07 INFO node.AbstractConfigurationProvider: Created channel
> hdfs-channel-kafka
> 16/02/04 11:32:07 INFO source.DefaultSourceFactory: Creating instance of
> source kafka-source-test, type org.apache.flume.source.kafka.KafkaSource
> 16/02/04 11:32:07 INFO kafka.KafkaSourceUtil: context={
> parameters:{interceptors.i1.type=timestamp,
> zookeeperConnect=zk01:2181/mesos-kafka, channels=hdfs-channel-kafka,
> groupId=flume, consumer.timeout.ms=100, topic=test,
> type=org.apache.flume.source.kafka.KafkaSource, interceptors=i1} }
> 16/02/04 11:32:07 INFO sink.DefaultSinkFactory: Creating instance of sink:
> hdfs-sink-kafka, type: hdfs
> 16/02/04 11:32:07 INFO node.AbstractConfigurationProvider: Channel
> hdfs-channel-kafka connected to [kafka-source-test, hdfs-sink-kafka]
> 16/02/04 11:32:07 INFO node.Application: Starting new configuration:{
> sourceRunners:{kafka-source-test=PollableSourceRunner: {
> source:org.apache.flume.source.kafka.KafkaSource{name:kafka-source-test,state:IDLE}
> counterGroup:{ name:null counters:{} } }}
> sinkRunners:{hdfs-sink-kafka=SinkRunner: {
> policy:org.apache.flume.sink.DefaultSinkProcessor@2f33f35e counterGroup:{
> name:null counters:{} } }}
> channels:{hdfs-channel-kafka=org.apache.flume.channel.MemoryChannel{name:
> hdfs-channel-kafka}} }
> 16/02/04 11:32:07 INFO node.Application: Starting Channel
> hdfs-channel-kafka
> 16/02/04 11:32:07 INFO instrumentation.MonitoredCounterGroup: Monitored
> counter group for type: CHANNEL, name: hdfs-channel-kafka: Successfully
> registered new MBean.
> 16/02/04 11:32:07 INFO instrumentation.MonitoredCounterGroup: Component
> type: CHANNEL, name: hdfs-channel-kafka started
> 16/02/04 11:32:07 INFO node.Application: Starting Sink hdfs-sink-kafka
> 16/02/04 11:32:07 INFO node.Application: Starting Source kafka-source-test
> 16/02/04 11:32:07 INFO kafka.KafkaSource: Starting
> org.apache.flume.source.kafka.KafkaSource{name:kafka-source-test,state:IDLE}...
> 16/02/04 11:32:07 INFO instrumentation.MonitoredCounterGroup: Monitored
> counter group for type: SINK, name: hdfs-sink-kafka: Successfully
> registered new MBean.
> 16/02/04 11:32:07 INFO instrumentation.MonitoredCounterGroup: Component
> type: SINK, name: hdfs-sink-kafka started
> 16/02/04 11:32:07 INFO mortbay.log: Logging to
> org.slf4j.impl.Log4jLoggerAdapter(org.mortbay.log) via
> org.mortbay.log.Slf4jLog
> 16/02/04 11:32:07 INFO mortbay.log: jetty-6.1.26.cloudera.4
> 16/02/04 11:32:07 INFO mortbay.log: Started
> SelectChannelConnector@0.0.0.0:34545
> 16/02/04 11:32:08 INFO utils.VerifiableProperties: Verifying properties
> 16/02/04 11:32:08 INFO utils.VerifiableProperties: Property
> auto.commit.enable is overridden to false
> 16/02/04 11:32:08 INFO utils.VerifiableProperties: Property
> consumer.timeout.ms is overridden to 10
> 16/02/04 11:32:08 INFO utils.VerifiableProperties: Property group.id is
> overridden to flume
> 16/02/04 11:32:08 INFO utils.VerifiableProperties: Property
> zookeeper.connect is overridden to zk01:2181/mesos-kafka
> 16/02/04 11:32:08 INFO consumer.ZookeeperConsumerConnector:
> [flume_mesos10-1454614328204-ca8a74df], Connecting to zookeeper instance at
> zk01:2181/mesos-kafka
> 16/02/04 11:32:08 INFO zkclient.ZkEventThread: Starting ZkClient event
> thread.
> 16/02/04 11:32:08 INFO zookeeper.ZooKeeper: Client
> environment:zookeeper.version=3.4.5-946--1, built on 05/18/2015 19:03 GMT
> 16/02/04 11:32:08 INFO zookeeper.ZooKeeper: Client environment:host.name
> =mesos10
> 16/02/04 11:32:08 INFO zookeeper.ZooKeeper: Client
> environment:java.version=1.8.0_72-internal
> 16/02/04 11:32:08 INFO zookeeper.ZooKeeper: Client
> environment:java.vendor=Oracle Corporation
> 16/02/04 11:32:08 INFO zookeeper.ZooKeeper: Client
> environment:java.home=/usr/lib/jvm/java-8-openjdk-amd64/jre
> 16/02/04 11:32:08 INFO zookeeper.ZooKeeper: Client
> environment:java.io.tmpdir=/tmp
> 16/02/04 11:32:08 INFO zookeeper.ZooKeeper: Client
> environment:java.compiler=<NA>
> 16/02/04 11:32:08 INFO zookeeper.ZooKeeper: Client environment:os.name
> =Linux
> 16/02/04 11:32:08 INFO zookeeper.ZooKeeper: Client
> environment:os.arch=amd64
> 16/02/04 11:32:08 INFO zookeeper.ZooKeeper: Client
> environment:os.version=3.13.0-63-generic
> 16/02/04 11:32:08 INFO zookeeper.ZooKeeper: Client environment:user.name
> =marathon
> 16/02/04 11:32:08 INFO zookeeper.ZooKeeper: Client
> environment:user.home=/opt/marathon
> 16/02/04 11:32:08 INFO zookeeper.ZooKeeper: Initiating client connection,
> connectString=zk01:2181/mesos-kafka sessionTimeout=6000
> watcher=org.I0Itec.zkclient.ZkClient@2e1b7b98
> 16/02/04 11:32:08 INFO zookeeper.ClientCnxn: Opening socket connection to
> server 10.100.6.251/10.100.6.251:2181. Will not attempt to authenticate
> using SASL (unknown error)
> 16/02/04 11:32:08 INFO zookeeper.ClientCnxn: Socket connection established
> to 10.100.6.251/10.100.6.251:2181, initiating session
> 16/02/04 11:32:08 INFO zookeeper.ClientCnxn: Session establishment
> complete on server 10.100.6.251/10.100.6.251:2181, sessionid =
> 0x152858b1cc07491, negotiated timeout = 6000
> 16/02/04 11:32:08 INFO zkclient.ZkClient: zookeeper state changed
> (SyncConnected)
> 16/02/04 11:32:09 INFO consumer.ZookeeperConsumerConnector:
> [flume_mesos10-1454614328204-ca8a74df], begin registering consumer
> flume_mesos10-1454614328204-ca8a74df in ZK
> 16/02/04 11:32:09 INFO consumer.ZookeeperConsumerConnector:
> [flume_mesos10-1454614328204-ca8a74df], end registering consumer
> flume_mesos10-1454614328204-ca8a74df in ZK
> 16/02/04 11:32:09 INFO consumer.ZookeeperConsumerConnector:
> [flume_mesos10-1454614328204-ca8a74df], starting watcher executor thread
> for consumer flume_mesos10-1454614328204-ca8a74df
> 16/02/04 11:32:09 INFO consumer.ZookeeperConsumerConnector:
> [flume_mesos10-1454614328204-ca8a74df], begin rebalancing consumer
> flume_mesos10-1454614328204-ca8a74df try #0
> 16/02/04 11:32:09 WARN consumer.ZookeeperConsumerConnector:
> [flume_mesos10-1454614328204-ca8a74df], no brokers found when trying to
> rebalance.
> 16/02/04 11:32:09 INFO consumer.ZookeeperConsumerConnector:
> [flume_mesos10-1454614328204-ca8a74df], end rebalancing consumer
> flume_mesos10-1454614328204-ca8a74df try #0
> 16/02/04 11:32:09 INFO kafka.KafkaSource: Kafka source kafka-source-test
> do started.
> 16/02/04 11:32:09 INFO instrumentation.MonitoredCounterGroup: Monitored
> counter group for type: SOURCE, name: kafka-source-test: Successfully
> registered new MBean.
> 16/02/04 11:32:09 INFO instrumentation.MonitoredCounterGroup: Component
> type: SOURCE, name: kafka-source-test started
> --
>
> --
> Justin Alan Ryan
> Sr. Systems / Release Engineer
> ZipRealty
>