You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@samza.apache.org by Job-Selina Wu <sw...@gmail.com> on 2015/07/24 03:33:18 UTC

Samza: can not produce new data to kafka

Hi,  All

     I am trying to write my first StreamTask class. I have a topic at
Kafka called "http-demo". I like to read the topic and write it to another
topic called "demo-duplicate"

    Howeven there is not topic written to Kafka.

    My properties file and StreamTask are below.  Can anyone told me what
is the bug?
    BTW, if I set checkpoint or Metrics at properties file. the topic of
checkpoint and metrics could be written to Kafka.  And the content of
 input topic -- http-demo could be show correctly.

Your help is highly appreciated.

Sincerely,
Selina


- - -- - - - - -
# Job
job.factory.class=org.apache.samza.job.yarn.YarnJobFactory
job.name=demo-parser

# YARN
yarn.package.path=file:///Users/selina/IdeaProjects/samza-Demo/target/hello-samza-0.9.1-dist.tar.gz

# Task
task.class=samza.http.demo.task.HttpDemoParserStreamTask
task.inputs=kafka.http-demo

# Serializers
serializers.registry.string.class=org.apache.samza.serializers.StringSerdeFactory

# Kafka System
systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory
systems.kafka.samza.msg.serde=string
systems.kafka.samza.key.serde=string
systems.kafka.consumer.zookeeper.connect=localhost:2181/
systems.kafka.consumer.auto.offset.reset=largest
systems.kafka.producer.bootstrap.servers=localhost:9092
- - -- - - - - -

My StreamTask class is simple also

---------

/**
 *
 * Read data from http-demo topic and write it back to "demo-duplicate"
 */
public class HttpDemoParserStreamTask implements StreamTask {

    private static final SystemStream OUTPUT_STREAM = new
SystemStream("kafka", "demo-duplicate");
    Logger logger = LoggerFactory.getLogger(HttpDemoParserStreamTask.class);

    @SuppressWarnings("unchecked")
    @Override
    public void process(IncomingMessageEnvelope envelope, MessageCollector
collector, TaskCoordinator coordinator) throws Exception {

        String key = (String) envelope.getKey();
        String message = envelope.getMessage().toString();
        logger.info("key="+key+": message="+message);

        Map<String, String> outgoingMap = (Map<String, String>)
(envelope.getMessage());
        collector.send(new OutgoingMessageEnvelope(OUTPUT_STREAM,
outgoingMap));
        //collector.send(new OutgoingMessageEnvelope(OUTPUT_STREAM,
message));
    }

}

-------

Re: Samza: can not produce new data to kafka

Posted by Job-Selina Wu <sw...@gmail.com>.
Hi, Yan:

        Thanks for reply my email in detail.  All the files at Yarn logs
shown below. No Exception under samza-Demo/deploy/yarn/logs.  I guess the
StreamTask did not called ...


Partial stdout  file
(samza-Demo/deploy/yarn/logs/userlogs/application_1437767867729_0001/container_1437767867729_0001_01_000001/stderr)
is pasted below. In short, the log by  logger.info("key="+key+":
message="+message); " was not generated.

/Library/Java/JavaVirtualMachines/jdk1.8.0_45.jdk/Contents/Home/bin/java
-server -Dsamza.container.name=samza-application-master
-Dlog4j.configuration=file:/tmp/hadoop-selina/nm-local-dir/usercache/selina/appcache/application_1437767867729_0001/container_1437767867729_0001_01_000001/__package/lib/log4j.xml
-Dsamza.log.dir=/Users/selina/IdeaProjects/samza-Demo/deploy/yarn/logs/userlogs/application_1437767867729_0001/container_1437767867729_0001_01_000001
-Djava.io.tmpdir=/tmp/hadoop-selina/nm-local-dir/usercache/selina/appcache/application_1437767867729_0001/container_1437767867729_0001_01_000001/__package/tmp
-Xmx768M -XX:+PrintGCDateStamps
-Xloggc:/Users/selina/IdeaProjects/samza-Demo/deploy/yarn/logs/userlogs/application_1437767867729_0001/container_1437767867729_0001_01_000001/gc.log
-XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=10
-XX:GCLogFileSize=10241024 -d64 -cp
/Users/selina/IdeaProjects/samza-Demo/deploy/yarn/etc/hadoop:/tmp/hadoop-selina/nm-local-dir/usercache/selina/appcache/application_1437767867729_0001/container_1437767867729_0001_01_000001/__package/lib/activation-1.1.jar:/tmp/hadoop-selina/nm-local-dir/usercache/selina/appcache/application_1437767867729_0001/container_1437767867729_0001_01_000001/__package/lib/akka-actor_2.10-2.1.2.jar:/tmp/hadoop-selina/nm-local-dir/usercache/selina/appcache/application_1437767867729_0001/container_1437767867729_0001_01_000001/__package/lib/aopalliance-1.0.jar:/tmp/hadoop-selina/nm-local-dir/usercache/selina/appcache/application_1437767867729_0001/container_1437767867729_0001_01_000001/__package/lib/asm-3.1.jar:/tmp/hadoop-selina/nm-local-dir/usercache/selina/appcache/application_1437767867729_0001/container_1437767867729_0001_01_000001/__package/lib/avro-1.7.4.jar:/tmp/hadoop-selina/nm-local-dir/usercache/selina/appcache/application_1437767867729_0001/container_1437767867729_0001_01_000001/__package/lib/commons-beanutils-1.7.0.jar:/tmp/hadoop-selina/nm-local-dir/usercache/selina/appcache/application_1437767867729_0001/container_1437767867729_0001_01_000001/__package/lib/commons-beanutils-core-1.8.0.jar:/tmp/hadoop-selina/nm-local-dir/usercache/selina/appcache/application_1437767867729_0001/container_1437767867729_0001_01_000001/__package/lib/commons-cli-1.2


For file gc.log.0.current shown Allocation failure and Full GC

CommandLine flags: -XX:GCLogFileSize=10241024 -XX:InitialHeapSize=268435456
-XX:MaxHeapSize=805306368 -XX:NumberOfGCLogFiles=10 -XX:+PrintGC
-XX:+PrintGCDateStamps -XX:+PrintGCTimeStamps
-XX:+UseCompressedClassPointers -XX:+UseCompressedOops
-XX:+UseGCLogFileRotation -XX:+UseParallelGC

2015-07-24T13:28:56.901+0800: 0.694:* [GC (Allocation Failure)*
65536K->8449K(251392K), 0.0062314 secs]
2015-07-24T13:28:57.188+0800: 0.981: [GC (System.gc())
39240K->6305K(251392K), 0.0047744 secs]
2015-07-24T13:28:57.193+0800: 0.986: [Full GC (System.gc())
6305K->5940K(251392K), 0.0147206 secs]
2015-07-24T13:28:57.625+0800: 1.418: [GC (Allocation Failure)
71476K->12511K(251392K), 0.0030179 secs]
2015-07-24T13:28:59.889+0800: 3.682: [GC (Allocation Failure)
78047K->13859K(251392K), 0.0052610 secs]
2015-07-24T13:29:15.487+0800: 19.280: [GC (Metadata GC Threshold)
35659K->10106K(251392K), 0.0036350 secs]
2015-07-24T13:29:15.490+0800: 19.284: *[Full GC (Metadata GC
Threshold*)  10106K->7318K(149504K), 0.0200118 secs]



[image: Inline image 1]

   Your help is highly appreciated.

Sincerely,
Selina

On Fri, Jul 24, 2015 at 1:51 PM, Yan Fang <ya...@gmail.com> wrote:

> {quote}
>  I did not set auto.create.topics.enable anywhere
> {quote}
>
> Fine. Then its default to true. No worries.
>
> {quote}
> My job is listed as below. However I am wondering how can I know if my
> method "public void* process*(IncomingMessageEnvelope envelope,
> MessageCollector collector, TaskCoordinator coordinator)" was run or not.
> {quote}
>
> If you have log enabled (from the code, you did), you can check the
> contain's log to see if it has the output. Assuming you are using the local
> yarn like what hello-samza provides, you should be able to check the logs
> in deploy/yarn/userlogs/application_Id.
>
> If you use print.out method, you can see the result in the
> deploy/yarn/userlogs/application_Id 's sysout file (if the StreamTask)
> works.
>
> If it does not work, you can check the logs in
> deploy/yarn/userlogs/application_Id as well to see the exceptions if there
> is any.
>
> Thanks,
>
> Fang, Yan
> yanfang724@gmail.com
>
> On Fri, Jul 24, 2015 at 1:45 PM, Job-Selina Wu <sw...@gmail.com>
> wrote:
>
>> Hi, Yan and Shadi:
>>
>>     I made a mistake.  Actually, there is no log at /tmp/kafka-logs
>> created by "  logger.info("key="+key+": message="+message); ".  The log
>> I provided actually is log for input topic "http-demo" at
>> /tmp/kafka-logs/http-demo-0
>>
>>     My job is listed as below. However I am wondering how can I know if
>> my method "public void* process*(IncomingMessageEnvelope envelope,
>> MessageCollector collector, TaskCoordinator coordinator)" was run or
>> not.
>>
>>     I manually create topic "demo-duplicate" by command line, otherwise
>> it will be created by samza code.
>>
>>     I checked I did not set auto.create.topics.enable anywhere. Attached
>> is my properties file for Kafka
>>
>>
>>    Your help is highly appreciated
>>
>> Sincerely,
>> Selina
>>
>> [image: Inline image 1]
>>
>>
>>
>>
>> On Fri, Jul 24, 2015 at 11:56 AM, Yan Fang <ya...@gmail.com> wrote:
>>
>>> The code and the property seem good to me. collector.send(new
>>> OutgoingMessageEnvelope(OUTPUT_STREAM, message));should work. So I am
>>> curious if you accidentally disabled auto.create.topics.enable  ...Can
>>> you
>>> also try to send msgs from cmd line to "demo-duplicate" to see if it gets
>>> anything.
>>>
>>> Let me know if it works.
>>>
>>> Thanks,
>>>
>>> Fang, Yan
>>> yanfang724@gmail.com
>>>
>>> On Fri, Jul 24, 2015 at 11:48 AM, Job-Selina Wu <sw...@gmail.com>
>>> wrote:
>>>
>>> > Hi, Shadi:
>>> >
>>> >       Thans a lot for your reply.
>>> > 1. There is no error log at Kafka and Samza
>>> >
>>> > 2.  this line "  logger.info("key="+key+": message="+message); " write
>>> > log correctly as below:
>>> >
>>> > [image: Inline image 1]
>>> >
>>> > This are my last two message with right count
>>> >
>>> > 3. I tried both way below, none of them create topic, but I will try it
>>> > again.
>>> >
>>> > collector.send(new OutgoingMessageEnvelope(OUTPUT_STREAM,
>>> outgoingMap));
>>> >
>>> > //collector.send(new OutgoingMessageEnvelope(OUTPUT_STREAM, message));
>>> >
>>> > 4. I wrote a topic call "http-demo" to Kafka as my input, and the
>>> content
>>> > can be show with command line below, so the Kafka should be OK.
>>> > deploy/kafka/bin/kafka-console-consumer.sh --zookeeper localhost:2181
>>> > --from-beginning --topic http-demo
>>> >
>>> > Your help is highly appreciated.
>>> >
>>> > Sincerely,
>>> > Selina
>>> >
>>> >
>>> >
>>> >
>>> > On Fri, Jul 24, 2015 at 9:29 AM, Shadi Noghabi <
>>> > snoghabi@linkedin.com.invalid> wrote:
>>> >
>>> >> Selina,
>>> >>
>>> >> You should probably check a few things
>>> >> 1. Your log files to see if you have any errors. Also, does you job
>>> fail
>>> >> or
>>> >> continues running?
>>> >> 2. Does this line "  logger.info("key="+key+": message="+message); "
>>> >> write
>>> >> any logs?
>>> >> 3. This might not be the only reason, but you are sending messages of
>>> >> type Map<String,
>>> >> String>. However, in your config file, you defined "
>>> >> systems.kafka.samza.msg.serde=string" which expects the message to be
>>> a
>>> >> String.
>>> >>
>>> >>
>>> >> Shadi
>>> >>
>>> >>
>>> >> On Thu, Jul 23, 2015 at 6:33 PM, Job-Selina Wu <swucareer99@gmail.com
>>> >
>>> >> wrote:
>>> >>
>>> >> > Hi,  All
>>> >> >
>>> >> >      I am trying to write my first StreamTask class. I have a topic
>>> at
>>> >> > Kafka called "http-demo". I like to read the topic and write it to
>>> >> another
>>> >> > topic called "demo-duplicate"
>>> >> >
>>> >> >     Howeven there is not topic written to Kafka.
>>> >> >
>>> >> >     My properties file and StreamTask are below.  Can anyone told me
>>> >> what
>>> >> > is the bug?
>>> >> >     BTW, if I set checkpoint or Metrics at properties file. the
>>> topic of
>>> >> > checkpoint and metrics could be written to Kafka.  And the content
>>> of
>>> >> >  input topic -- http-demo could be show correctly.
>>> >> >
>>> >> > Your help is highly appreciated.
>>> >> >
>>> >> > Sincerely,
>>> >> > Selina
>>> >> >
>>> >> >
>>> >> > - - -- - - - - -
>>> >> > # Job
>>> >> > job.factory.class=org.apache.samza.job.yarn.YarnJobFactory
>>> >> > job.name=demo-parser
>>> >>
>>> >> >
>>> >> > # YARN
>>> >> >
>>> >> >
>>> >>
>>> yarn.package.path=file:///Users/selina/IdeaProjects/samza-Demo/target/hello-samza-0.9.1-dist.tar.gz
>>> >> >
>>> >> > # Task
>>> >> > task.class=samza.http.demo.task.HttpDemoParserStreamTask
>>> >> > task.inputs=kafka.http-demo
>>> >> >
>>> >> > # Serializers
>>> >> >
>>> >> >
>>> >>
>>> serializers.registry.string.class=org.apache.samza.serializers.StringSerdeFactory
>>> >> >
>>> >> > # Kafka System
>>> >> >
>>> >> >
>>> >>
>>> systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory
>>> >> > systems.kafka.samza.msg.serde=string
>>> >> > systems.kafka.samza.key.serde=string
>>> >> > systems.kafka.consumer.zookeeper.connect=localhost:2181/
>>> >> > systems.kafka.consumer.auto.offset.reset=largest
>>> >> > systems.kafka.producer.bootstrap.servers=localhost:9092
>>> >> > - - -- - - - - -
>>> >> >
>>> >> > My StreamTask class is simple also
>>> >> >
>>> >> > ---------
>>> >> >
>>> >> > /**
>>> >> >  *
>>> >> >  * Read data from http-demo topic and write it back to
>>> "demo-duplicate"
>>> >> >  */
>>> >> > public class HttpDemoParserStreamTask implements StreamTask {
>>> >> >
>>> >> >     private static final SystemStream OUTPUT_STREAM = new
>>> >> > SystemStream("kafka", "demo-duplicate");
>>> >> >     Logger logger =
>>> >> > LoggerFactory.getLogger(HttpDemoParserStreamTask.class);
>>> >> >
>>> >> >     @SuppressWarnings("unchecked")
>>> >> >     @Override
>>> >> >     public void process(IncomingMessageEnvelope envelope,
>>> >> MessageCollector
>>> >> > collector, TaskCoordinator coordinator) throws Exception {
>>> >> >
>>> >> >         String key = (String) envelope.getKey();
>>> >> >         String message = envelope.getMessage().toString();
>>> >> >         logger.info("key="+key+": message="+message);
>>> >> >
>>> >> >         Map<String, String> outgoingMap = (Map<String, String>)
>>> >> > (envelope.getMessage());
>>> >> >         collector.send(new OutgoingMessageEnvelope(OUTPUT_STREAM,
>>> >> > outgoingMap));
>>> >> >         //collector.send(new OutgoingMessageEnvelope(OUTPUT_STREAM,
>>> >> > message));
>>> >> >     }
>>> >> >
>>> >> > }
>>> >> >
>>> >> > -------
>>> >> >
>>> >>
>>> >
>>> >
>>>
>>
>>
>

Re: Samza: can not produce new data to kafka

Posted by Yan Fang <ya...@gmail.com>.
{quote}
 I did not set auto.create.topics.enable anywhere
{quote}

Fine. Then its default to true. No worries.

{quote}
My job is listed as below. However I am wondering how can I know if my
method "public void* process*(IncomingMessageEnvelope envelope,
MessageCollector collector, TaskCoordinator coordinator)" was run or not.
{quote}

If you have log enabled (from the code, you did), you can check the
contain's log to see if it has the output. Assuming you are using the local
yarn like what hello-samza provides, you should be able to check the logs
in deploy/yarn/userlogs/application_Id.

If you use print.out method, you can see the result in the
deploy/yarn/userlogs/application_Id 's sysout file (if the StreamTask)
works.

If it does not work, you can check the logs in
deploy/yarn/userlogs/application_Id as well to see the exceptions if there
is any.

Thanks,

Fang, Yan
yanfang724@gmail.com

On Fri, Jul 24, 2015 at 1:45 PM, Job-Selina Wu <sw...@gmail.com>
wrote:

> Hi, Yan and Shadi:
>
>     I made a mistake.  Actually, there is no log at /tmp/kafka-logs
> created by "  logger.info("key="+key+": message="+message); ".  The log I
> provided actually is log for input topic "http-demo" at
> /tmp/kafka-logs/http-demo-0
>
>     My job is listed as below. However I am wondering how can I know if
> my method "public void* process*(IncomingMessageEnvelope envelope,
> MessageCollector collector, TaskCoordinator coordinator)" was run or not.
>
>     I manually create topic "demo-duplicate" by command line, otherwise
> it will be created by samza code.
>
>     I checked I did not set auto.create.topics.enable anywhere. Attached
> is my properties file for Kafka
>
>
>    Your help is highly appreciated
>
> Sincerely,
> Selina
>
> [image: Inline image 1]
>
>
>
>
> On Fri, Jul 24, 2015 at 11:56 AM, Yan Fang <ya...@gmail.com> wrote:
>
>> The code and the property seem good to me. collector.send(new
>> OutgoingMessageEnvelope(OUTPUT_STREAM, message));should work. So I am
>> curious if you accidentally disabled auto.create.topics.enable  ...Can you
>> also try to send msgs from cmd line to "demo-duplicate" to see if it gets
>> anything.
>>
>> Let me know if it works.
>>
>> Thanks,
>>
>> Fang, Yan
>> yanfang724@gmail.com
>>
>> On Fri, Jul 24, 2015 at 11:48 AM, Job-Selina Wu <sw...@gmail.com>
>> wrote:
>>
>> > Hi, Shadi:
>> >
>> >       Thans a lot for your reply.
>> > 1. There is no error log at Kafka and Samza
>> >
>> > 2.  this line "  logger.info("key="+key+": message="+message); " write
>> > log correctly as below:
>> >
>> > [image: Inline image 1]
>> >
>> > This are my last two message with right count
>> >
>> > 3. I tried both way below, none of them create topic, but I will try it
>> > again.
>> >
>> > collector.send(new OutgoingMessageEnvelope(OUTPUT_STREAM, outgoingMap));
>> >
>> > //collector.send(new OutgoingMessageEnvelope(OUTPUT_STREAM, message));
>> >
>> > 4. I wrote a topic call "http-demo" to Kafka as my input, and the
>> content
>> > can be show with command line below, so the Kafka should be OK.
>> > deploy/kafka/bin/kafka-console-consumer.sh --zookeeper localhost:2181
>> > --from-beginning --topic http-demo
>> >
>> > Your help is highly appreciated.
>> >
>> > Sincerely,
>> > Selina
>> >
>> >
>> >
>> >
>> > On Fri, Jul 24, 2015 at 9:29 AM, Shadi Noghabi <
>> > snoghabi@linkedin.com.invalid> wrote:
>> >
>> >> Selina,
>> >>
>> >> You should probably check a few things
>> >> 1. Your log files to see if you have any errors. Also, does you job
>> fail
>> >> or
>> >> continues running?
>> >> 2. Does this line "  logger.info("key="+key+": message="+message); "
>> >> write
>> >> any logs?
>> >> 3. This might not be the only reason, but you are sending messages of
>> >> type Map<String,
>> >> String>. However, in your config file, you defined "
>> >> systems.kafka.samza.msg.serde=string" which expects the message to be a
>> >> String.
>> >>
>> >>
>> >> Shadi
>> >>
>> >>
>> >> On Thu, Jul 23, 2015 at 6:33 PM, Job-Selina Wu <sw...@gmail.com>
>> >> wrote:
>> >>
>> >> > Hi,  All
>> >> >
>> >> >      I am trying to write my first StreamTask class. I have a topic
>> at
>> >> > Kafka called "http-demo". I like to read the topic and write it to
>> >> another
>> >> > topic called "demo-duplicate"
>> >> >
>> >> >     Howeven there is not topic written to Kafka.
>> >> >
>> >> >     My properties file and StreamTask are below.  Can anyone told me
>> >> what
>> >> > is the bug?
>> >> >     BTW, if I set checkpoint or Metrics at properties file. the
>> topic of
>> >> > checkpoint and metrics could be written to Kafka.  And the content of
>> >> >  input topic -- http-demo could be show correctly.
>> >> >
>> >> > Your help is highly appreciated.
>> >> >
>> >> > Sincerely,
>> >> > Selina
>> >> >
>> >> >
>> >> > - - -- - - - - -
>> >> > # Job
>> >> > job.factory.class=org.apache.samza.job.yarn.YarnJobFactory
>> >> > job.name=demo-parser
>> >>
>> >> >
>> >> > # YARN
>> >> >
>> >> >
>> >>
>> yarn.package.path=file:///Users/selina/IdeaProjects/samza-Demo/target/hello-samza-0.9.1-dist.tar.gz
>> >> >
>> >> > # Task
>> >> > task.class=samza.http.demo.task.HttpDemoParserStreamTask
>> >> > task.inputs=kafka.http-demo
>> >> >
>> >> > # Serializers
>> >> >
>> >> >
>> >>
>> serializers.registry.string.class=org.apache.samza.serializers.StringSerdeFactory
>> >> >
>> >> > # Kafka System
>> >> >
>> >> >
>> >>
>> systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory
>> >> > systems.kafka.samza.msg.serde=string
>> >> > systems.kafka.samza.key.serde=string
>> >> > systems.kafka.consumer.zookeeper.connect=localhost:2181/
>> >> > systems.kafka.consumer.auto.offset.reset=largest
>> >> > systems.kafka.producer.bootstrap.servers=localhost:9092
>> >> > - - -- - - - - -
>> >> >
>> >> > My StreamTask class is simple also
>> >> >
>> >> > ---------
>> >> >
>> >> > /**
>> >> >  *
>> >> >  * Read data from http-demo topic and write it back to
>> "demo-duplicate"
>> >> >  */
>> >> > public class HttpDemoParserStreamTask implements StreamTask {
>> >> >
>> >> >     private static final SystemStream OUTPUT_STREAM = new
>> >> > SystemStream("kafka", "demo-duplicate");
>> >> >     Logger logger =
>> >> > LoggerFactory.getLogger(HttpDemoParserStreamTask.class);
>> >> >
>> >> >     @SuppressWarnings("unchecked")
>> >> >     @Override
>> >> >     public void process(IncomingMessageEnvelope envelope,
>> >> MessageCollector
>> >> > collector, TaskCoordinator coordinator) throws Exception {
>> >> >
>> >> >         String key = (String) envelope.getKey();
>> >> >         String message = envelope.getMessage().toString();
>> >> >         logger.info("key="+key+": message="+message);
>> >> >
>> >> >         Map<String, String> outgoingMap = (Map<String, String>)
>> >> > (envelope.getMessage());
>> >> >         collector.send(new OutgoingMessageEnvelope(OUTPUT_STREAM,
>> >> > outgoingMap));
>> >> >         //collector.send(new OutgoingMessageEnvelope(OUTPUT_STREAM,
>> >> > message));
>> >> >     }
>> >> >
>> >> > }
>> >> >
>> >> > -------
>> >> >
>> >>
>> >
>> >
>>
>
>

Re: Samza: can not produce new data to kafka

Posted by Job-Selina Wu <sw...@gmail.com>.
Hi, Yan and Shadi:

    I made a mistake.  Actually, there is no log at /tmp/kafka-logs created
by "  logger.info("key="+key+": message="+message); ".  The log I provided
actually is log for input topic "http-demo" at /tmp/kafka-logs/http-demo-0

    My job is listed as below. However I am wondering how can I know if my
method "public void* process*(IncomingMessageEnvelope envelope,
MessageCollector collector, TaskCoordinator coordinator)" was run or not.

    I manually create topic "demo-duplicate" by command line, otherwise it
will be created by samza code.

    I checked I did not set auto.create.topics.enable anywhere. Attached is
my properties file for Kafka


   Your help is highly appreciated

Sincerely,
Selina

[image: Inline image 1]




On Fri, Jul 24, 2015 at 11:56 AM, Yan Fang <ya...@gmail.com> wrote:

> The code and the property seem good to me. collector.send(new
> OutgoingMessageEnvelope(OUTPUT_STREAM, message));should work. So I am
> curious if you accidentally disabled auto.create.topics.enable  ...Can you
> also try to send msgs from cmd line to "demo-duplicate" to see if it gets
> anything.
>
> Let me know if it works.
>
> Thanks,
>
> Fang, Yan
> yanfang724@gmail.com
>
> On Fri, Jul 24, 2015 at 11:48 AM, Job-Selina Wu <sw...@gmail.com>
> wrote:
>
> > Hi, Shadi:
> >
> >       Thans a lot for your reply.
> > 1. There is no error log at Kafka and Samza
> >
> > 2.  this line "  logger.info("key="+key+": message="+message); " write
> > log correctly as below:
> >
> > [image: Inline image 1]
> >
> > This are my last two message with right count
> >
> > 3. I tried both way below, none of them create topic, but I will try it
> > again.
> >
> > collector.send(new OutgoingMessageEnvelope(OUTPUT_STREAM, outgoingMap));
> >
> > //collector.send(new OutgoingMessageEnvelope(OUTPUT_STREAM, message));
> >
> > 4. I wrote a topic call "http-demo" to Kafka as my input, and the content
> > can be show with command line below, so the Kafka should be OK.
> > deploy/kafka/bin/kafka-console-consumer.sh --zookeeper localhost:2181
> > --from-beginning --topic http-demo
> >
> > Your help is highly appreciated.
> >
> > Sincerely,
> > Selina
> >
> >
> >
> >
> > On Fri, Jul 24, 2015 at 9:29 AM, Shadi Noghabi <
> > snoghabi@linkedin.com.invalid> wrote:
> >
> >> Selina,
> >>
> >> You should probably check a few things
> >> 1. Your log files to see if you have any errors. Also, does you job fail
> >> or
> >> continues running?
> >> 2. Does this line "  logger.info("key="+key+": message="+message); "
> >> write
> >> any logs?
> >> 3. This might not be the only reason, but you are sending messages of
> >> type Map<String,
> >> String>. However, in your config file, you defined "
> >> systems.kafka.samza.msg.serde=string" which expects the message to be a
> >> String.
> >>
> >>
> >> Shadi
> >>
> >>
> >> On Thu, Jul 23, 2015 at 6:33 PM, Job-Selina Wu <sw...@gmail.com>
> >> wrote:
> >>
> >> > Hi,  All
> >> >
> >> >      I am trying to write my first StreamTask class. I have a topic at
> >> > Kafka called "http-demo". I like to read the topic and write it to
> >> another
> >> > topic called "demo-duplicate"
> >> >
> >> >     Howeven there is not topic written to Kafka.
> >> >
> >> >     My properties file and StreamTask are below.  Can anyone told me
> >> what
> >> > is the bug?
> >> >     BTW, if I set checkpoint or Metrics at properties file. the topic
> of
> >> > checkpoint and metrics could be written to Kafka.  And the content of
> >> >  input topic -- http-demo could be show correctly.
> >> >
> >> > Your help is highly appreciated.
> >> >
> >> > Sincerely,
> >> > Selina
> >> >
> >> >
> >> > - - -- - - - - -
> >> > # Job
> >> > job.factory.class=org.apache.samza.job.yarn.YarnJobFactory
> >> > job.name=demo-parser
> >>
> >> >
> >> > # YARN
> >> >
> >> >
> >>
> yarn.package.path=file:///Users/selina/IdeaProjects/samza-Demo/target/hello-samza-0.9.1-dist.tar.gz
> >> >
> >> > # Task
> >> > task.class=samza.http.demo.task.HttpDemoParserStreamTask
> >> > task.inputs=kafka.http-demo
> >> >
> >> > # Serializers
> >> >
> >> >
> >>
> serializers.registry.string.class=org.apache.samza.serializers.StringSerdeFactory
> >> >
> >> > # Kafka System
> >> >
> >> >
> >>
> systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory
> >> > systems.kafka.samza.msg.serde=string
> >> > systems.kafka.samza.key.serde=string
> >> > systems.kafka.consumer.zookeeper.connect=localhost:2181/
> >> > systems.kafka.consumer.auto.offset.reset=largest
> >> > systems.kafka.producer.bootstrap.servers=localhost:9092
> >> > - - -- - - - - -
> >> >
> >> > My StreamTask class is simple also
> >> >
> >> > ---------
> >> >
> >> > /**
> >> >  *
> >> >  * Read data from http-demo topic and write it back to
> "demo-duplicate"
> >> >  */
> >> > public class HttpDemoParserStreamTask implements StreamTask {
> >> >
> >> >     private static final SystemStream OUTPUT_STREAM = new
> >> > SystemStream("kafka", "demo-duplicate");
> >> >     Logger logger =
> >> > LoggerFactory.getLogger(HttpDemoParserStreamTask.class);
> >> >
> >> >     @SuppressWarnings("unchecked")
> >> >     @Override
> >> >     public void process(IncomingMessageEnvelope envelope,
> >> MessageCollector
> >> > collector, TaskCoordinator coordinator) throws Exception {
> >> >
> >> >         String key = (String) envelope.getKey();
> >> >         String message = envelope.getMessage().toString();
> >> >         logger.info("key="+key+": message="+message);
> >> >
> >> >         Map<String, String> outgoingMap = (Map<String, String>)
> >> > (envelope.getMessage());
> >> >         collector.send(new OutgoingMessageEnvelope(OUTPUT_STREAM,
> >> > outgoingMap));
> >> >         //collector.send(new OutgoingMessageEnvelope(OUTPUT_STREAM,
> >> > message));
> >> >     }
> >> >
> >> > }
> >> >
> >> > -------
> >> >
> >>
> >
> >
>

Re: Samza: can not produce new data to kafka

Posted by Yan Fang <ya...@gmail.com>.
The code and the property seem good to me. collector.send(new
OutgoingMessageEnvelope(OUTPUT_STREAM, message));should work. So I am
curious if you accidentally disabled auto.create.topics.enable  ...Can you
also try to send msgs from cmd line to "demo-duplicate" to see if it gets
anything.

Let me know if it works.

Thanks,

Fang, Yan
yanfang724@gmail.com

On Fri, Jul 24, 2015 at 11:48 AM, Job-Selina Wu <sw...@gmail.com>
wrote:

> Hi, Shadi:
>
>       Thans a lot for your reply.
> 1. There is no error log at Kafka and Samza
>
> 2.  this line "  logger.info("key="+key+": message="+message); " write
> log correctly as below:
>
> [image: Inline image 1]
>
> This are my last two message with right count
>
> 3. I tried both way below, none of them create topic, but I will try it
> again.
>
> collector.send(new OutgoingMessageEnvelope(OUTPUT_STREAM, outgoingMap));
>
> //collector.send(new OutgoingMessageEnvelope(OUTPUT_STREAM, message));
>
> 4. I wrote a topic call "http-demo" to Kafka as my input, and the content
> can be show with command line below, so the Kafka should be OK.
> deploy/kafka/bin/kafka-console-consumer.sh --zookeeper localhost:2181
> --from-beginning --topic http-demo
>
> Your help is highly appreciated.
>
> Sincerely,
> Selina
>
>
>
>
> On Fri, Jul 24, 2015 at 9:29 AM, Shadi Noghabi <
> snoghabi@linkedin.com.invalid> wrote:
>
>> Selina,
>>
>> You should probably check a few things
>> 1. Your log files to see if you have any errors. Also, does you job fail
>> or
>> continues running?
>> 2. Does this line "  logger.info("key="+key+": message="+message); "
>> write
>> any logs?
>> 3. This might not be the only reason, but you are sending messages of
>> type Map<String,
>> String>. However, in your config file, you defined "
>> systems.kafka.samza.msg.serde=string" which expects the message to be a
>> String.
>>
>>
>> Shadi
>>
>>
>> On Thu, Jul 23, 2015 at 6:33 PM, Job-Selina Wu <sw...@gmail.com>
>> wrote:
>>
>> > Hi,  All
>> >
>> >      I am trying to write my first StreamTask class. I have a topic at
>> > Kafka called "http-demo". I like to read the topic and write it to
>> another
>> > topic called "demo-duplicate"
>> >
>> >     Howeven there is not topic written to Kafka.
>> >
>> >     My properties file and StreamTask are below.  Can anyone told me
>> what
>> > is the bug?
>> >     BTW, if I set checkpoint or Metrics at properties file. the topic of
>> > checkpoint and metrics could be written to Kafka.  And the content of
>> >  input topic -- http-demo could be show correctly.
>> >
>> > Your help is highly appreciated.
>> >
>> > Sincerely,
>> > Selina
>> >
>> >
>> > - - -- - - - - -
>> > # Job
>> > job.factory.class=org.apache.samza.job.yarn.YarnJobFactory
>> > job.name=demo-parser
>>
>> >
>> > # YARN
>> >
>> >
>> yarn.package.path=file:///Users/selina/IdeaProjects/samza-Demo/target/hello-samza-0.9.1-dist.tar.gz
>> >
>> > # Task
>> > task.class=samza.http.demo.task.HttpDemoParserStreamTask
>> > task.inputs=kafka.http-demo
>> >
>> > # Serializers
>> >
>> >
>> serializers.registry.string.class=org.apache.samza.serializers.StringSerdeFactory
>> >
>> > # Kafka System
>> >
>> >
>> systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory
>> > systems.kafka.samza.msg.serde=string
>> > systems.kafka.samza.key.serde=string
>> > systems.kafka.consumer.zookeeper.connect=localhost:2181/
>> > systems.kafka.consumer.auto.offset.reset=largest
>> > systems.kafka.producer.bootstrap.servers=localhost:9092
>> > - - -- - - - - -
>> >
>> > My StreamTask class is simple also
>> >
>> > ---------
>> >
>> > /**
>> >  *
>> >  * Read data from http-demo topic and write it back to "demo-duplicate"
>> >  */
>> > public class HttpDemoParserStreamTask implements StreamTask {
>> >
>> >     private static final SystemStream OUTPUT_STREAM = new
>> > SystemStream("kafka", "demo-duplicate");
>> >     Logger logger =
>> > LoggerFactory.getLogger(HttpDemoParserStreamTask.class);
>> >
>> >     @SuppressWarnings("unchecked")
>> >     @Override
>> >     public void process(IncomingMessageEnvelope envelope,
>> MessageCollector
>> > collector, TaskCoordinator coordinator) throws Exception {
>> >
>> >         String key = (String) envelope.getKey();
>> >         String message = envelope.getMessage().toString();
>> >         logger.info("key="+key+": message="+message);
>> >
>> >         Map<String, String> outgoingMap = (Map<String, String>)
>> > (envelope.getMessage());
>> >         collector.send(new OutgoingMessageEnvelope(OUTPUT_STREAM,
>> > outgoingMap));
>> >         //collector.send(new OutgoingMessageEnvelope(OUTPUT_STREAM,
>> > message));
>> >     }
>> >
>> > }
>> >
>> > -------
>> >
>>
>
>

Re: Samza: can not produce new data to kafka

Posted by Job-Selina Wu <sw...@gmail.com>.
Hi, Shadi:

      Thans a lot for your reply.
1. There is no error log at Kafka and Samza

2.  this line "  logger.info("key="+key+": message="+message); " write log
correctly as below:

[image: Inline image 1]

This are my last two message with right count

3. I tried both way below, none of them create topic, but I will try it
again.

collector.send(new OutgoingMessageEnvelope(OUTPUT_STREAM, outgoingMap));

//collector.send(new OutgoingMessageEnvelope(OUTPUT_STREAM, message));

4. I wrote a topic call "http-demo" to Kafka as my input, and the content
can be show with command line below, so the Kafka should be OK.
deploy/kafka/bin/kafka-console-consumer.sh --zookeeper localhost:2181
--from-beginning --topic http-demo

Your help is highly appreciated.

Sincerely,
Selina




On Fri, Jul 24, 2015 at 9:29 AM, Shadi Noghabi <
snoghabi@linkedin.com.invalid> wrote:

> Selina,
>
> You should probably check a few things
> 1. Your log files to see if you have any errors. Also, does you job fail or
> continues running?
> 2. Does this line "  logger.info("key="+key+": message="+message); " write
> any logs?
> 3. This might not be the only reason, but you are sending messages of
> type Map<String,
> String>. However, in your config file, you defined "
> systems.kafka.samza.msg.serde=string" which expects the message to be a
> String.
>
>
> Shadi
>
>
> On Thu, Jul 23, 2015 at 6:33 PM, Job-Selina Wu <sw...@gmail.com>
> wrote:
>
> > Hi,  All
> >
> >      I am trying to write my first StreamTask class. I have a topic at
> > Kafka called "http-demo". I like to read the topic and write it to
> another
> > topic called "demo-duplicate"
> >
> >     Howeven there is not topic written to Kafka.
> >
> >     My properties file and StreamTask are below.  Can anyone told me what
> > is the bug?
> >     BTW, if I set checkpoint or Metrics at properties file. the topic of
> > checkpoint and metrics could be written to Kafka.  And the content of
> >  input topic -- http-demo could be show correctly.
> >
> > Your help is highly appreciated.
> >
> > Sincerely,
> > Selina
> >
> >
> > - - -- - - - - -
> > # Job
> > job.factory.class=org.apache.samza.job.yarn.YarnJobFactory
> > job.name=demo-parser
> >
> > # YARN
> >
> >
> yarn.package.path=file:///Users/selina/IdeaProjects/samza-Demo/target/hello-samza-0.9.1-dist.tar.gz
> >
> > # Task
> > task.class=samza.http.demo.task.HttpDemoParserStreamTask
> > task.inputs=kafka.http-demo
> >
> > # Serializers
> >
> >
> serializers.registry.string.class=org.apache.samza.serializers.StringSerdeFactory
> >
> > # Kafka System
> >
> >
> systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory
> > systems.kafka.samza.msg.serde=string
> > systems.kafka.samza.key.serde=string
> > systems.kafka.consumer.zookeeper.connect=localhost:2181/
> > systems.kafka.consumer.auto.offset.reset=largest
> > systems.kafka.producer.bootstrap.servers=localhost:9092
> > - - -- - - - - -
> >
> > My StreamTask class is simple also
> >
> > ---------
> >
> > /**
> >  *
> >  * Read data from http-demo topic and write it back to "demo-duplicate"
> >  */
> > public class HttpDemoParserStreamTask implements StreamTask {
> >
> >     private static final SystemStream OUTPUT_STREAM = new
> > SystemStream("kafka", "demo-duplicate");
> >     Logger logger =
> > LoggerFactory.getLogger(HttpDemoParserStreamTask.class);
> >
> >     @SuppressWarnings("unchecked")
> >     @Override
> >     public void process(IncomingMessageEnvelope envelope,
> MessageCollector
> > collector, TaskCoordinator coordinator) throws Exception {
> >
> >         String key = (String) envelope.getKey();
> >         String message = envelope.getMessage().toString();
> >         logger.info("key="+key+": message="+message);
> >
> >         Map<String, String> outgoingMap = (Map<String, String>)
> > (envelope.getMessage());
> >         collector.send(new OutgoingMessageEnvelope(OUTPUT_STREAM,
> > outgoingMap));
> >         //collector.send(new OutgoingMessageEnvelope(OUTPUT_STREAM,
> > message));
> >     }
> >
> > }
> >
> > -------
> >
>

Re: Samza: can not produce new data to kafka

Posted by Shadi Noghabi <sn...@linkedin.com.INVALID>.
Selina,

You should probably check a few things
1. Your log files to see if you have any errors. Also, does you job fail or
continues running?
2. Does this line "  logger.info("key="+key+": message="+message); " write
any logs?
3. This might not be the only reason, but you are sending messages of
type Map<String,
String>. However, in your config file, you defined "
systems.kafka.samza.msg.serde=string" which expects the message to be a
String.


Shadi


On Thu, Jul 23, 2015 at 6:33 PM, Job-Selina Wu <sw...@gmail.com>
wrote:

> Hi,  All
>
>      I am trying to write my first StreamTask class. I have a topic at
> Kafka called "http-demo". I like to read the topic and write it to another
> topic called "demo-duplicate"
>
>     Howeven there is not topic written to Kafka.
>
>     My properties file and StreamTask are below.  Can anyone told me what
> is the bug?
>     BTW, if I set checkpoint or Metrics at properties file. the topic of
> checkpoint and metrics could be written to Kafka.  And the content of
>  input topic -- http-demo could be show correctly.
>
> Your help is highly appreciated.
>
> Sincerely,
> Selina
>
>
> - - -- - - - - -
> # Job
> job.factory.class=org.apache.samza.job.yarn.YarnJobFactory
> job.name=demo-parser
>
> # YARN
>
> yarn.package.path=file:///Users/selina/IdeaProjects/samza-Demo/target/hello-samza-0.9.1-dist.tar.gz
>
> # Task
> task.class=samza.http.demo.task.HttpDemoParserStreamTask
> task.inputs=kafka.http-demo
>
> # Serializers
>
> serializers.registry.string.class=org.apache.samza.serializers.StringSerdeFactory
>
> # Kafka System
>
> systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory
> systems.kafka.samza.msg.serde=string
> systems.kafka.samza.key.serde=string
> systems.kafka.consumer.zookeeper.connect=localhost:2181/
> systems.kafka.consumer.auto.offset.reset=largest
> systems.kafka.producer.bootstrap.servers=localhost:9092
> - - -- - - - - -
>
> My StreamTask class is simple also
>
> ---------
>
> /**
>  *
>  * Read data from http-demo topic and write it back to "demo-duplicate"
>  */
> public class HttpDemoParserStreamTask implements StreamTask {
>
>     private static final SystemStream OUTPUT_STREAM = new
> SystemStream("kafka", "demo-duplicate");
>     Logger logger =
> LoggerFactory.getLogger(HttpDemoParserStreamTask.class);
>
>     @SuppressWarnings("unchecked")
>     @Override
>     public void process(IncomingMessageEnvelope envelope, MessageCollector
> collector, TaskCoordinator coordinator) throws Exception {
>
>         String key = (String) envelope.getKey();
>         String message = envelope.getMessage().toString();
>         logger.info("key="+key+": message="+message);
>
>         Map<String, String> outgoingMap = (Map<String, String>)
> (envelope.getMessage());
>         collector.send(new OutgoingMessageEnvelope(OUTPUT_STREAM,
> outgoingMap));
>         //collector.send(new OutgoingMessageEnvelope(OUTPUT_STREAM,
> message));
>     }
>
> }
>
> -------
>