You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@samza.apache.org by Yi Pan <ni...@gmail.com> on 2015/09/01 08:39:07 UTC

Re: how to run samza container for hello-samza?

Hi, Connie,

The error you saw is related to the internal topic that Samza uses, not
related w/ any of your input/output topics:
> __samza_coordinator_wikipedia-stats_1,0

What makes you think that your job starts from a wrong offset? Could you
share your job configuration and the logs? Samza allows the job to resume
from different offsets based on different configuration. It would be easier
for us to debug if you can share the config and logs.

Thanks!

On Sun, Aug 30, 2015 at 10:36 PM, Connie Chen <sp...@gmail.com> wrote:

> Thanks Yi, that helps a lot.
>
> For 1) though, I was still using YarnJobFactory, just found that I was
> getting that error in the logs when I look at YARN, so does that mean it is
> reprocessing messages, and will the job start in the same way if I restart
> the grid?
>
> Thanks again,
> Connie
>
> On Sun, Aug 30, 2015 at 10:26 PM, Yi Pan <ni...@gmail.com> wrote:
>
> > Hi, Connie,
> >
> > Let me clarify a bit further:
> > 1. ThreadJobFactory and ProcessJobFactory do not work w/ any cluster
> > management systems (i.e. YARN). Hence, when you say that you are
> > starting/stopping the grid and using ThreadJobFactory configuration, that
> > will not work. The only job factory that works w/ YARN is YarnJobFactory.
> > 2. The rat error is a Apache license check. Make sure that the source
> files
> > in your project has the Apache license header at the beginning
> > 3. Samza process the messages in a single-threaded mode. Two messages
> from
> > two different system stream partitions will be delivered to StreamTask in
> > sequence. Join usually means that you will need to buffer some amount of
> > messages in each stream in the KV-store. When a new message from stream A
> > comes, join task will need to lookup the relevant messages in the buffer
> of
> > other streams to yield the result.
> >
> > Best,
> >
> > -Yi
> >
> > On Sun, Aug 30, 2015 at 10:11 PM, Connie Chen <sp...@gmail.com>
> > wrote:
> >
> > > Hi Yi,
> > >
> > > Thanks for your response. A few more questions:
> > >
> > > 1) What are the semantics of the kafka topics in hello-world? (I am
> using
> > > the offline version that just produces updates in a loop). As in, if I
> do
> > > "bin/grid stop all" will the jobs re-read the same messages from Kafka
> or
> > > are they reading new ones? I get an error like:
> > >
> > > 2015-08-30 21:55:27 KafkaSystemConsumer [WARN] While refreshing brokers
> > for
> > > > [__samza_coordinator_wikipedia-stats_1,0]:
> > > org.apache.samza.SamzaException:
> > > > Already consuming TopicPartition
> > > [__samza_coordinator_wikipedia-stats_1,0].
> > > > Retrying.
> > >
> > >
> > > when I restart the grid and wondering if the behavior after the restart
> > > should be the same every time.
> > >
> > > 2) I am using ThreadJobFactory as you suggested for "job.factory.class"
> > in
> > > the stats task, but getting this error:
> > >
> > > [ERROR] Failed to execute goal
> org.apache.rat:apache-rat-plugin:0.9:check
> > > > (default) on project hello-samza: Too many files with unapproved
> > license:
> > >
> > >
> > > when I try to mvn package.
> > >
> > > 3) If I have two input streams, is the # of partitions configured from
> > > Kafka/ where can I set what partition the message goes to? Also, how
> > would
> > > I get the two messages from two streams in StreamTask? (seems like
> > > IncomingMessageEnvelope that process() provides only represents one
> > message
> > > from one stream, where would it take in the two messages sent to the
> same
> > > partition?)
> > >
> > > Thank you!
> > >
> > > Connie
> > >
> > > On Sun, Aug 30, 2015 at 8:34 PM, Yi Pan <ni...@gmail.com> wrote:
> > >
> > > > Hi, Connie,
> > > >
> > > > If I understand your trial example, you were trying to manually
> launch
> > > > Samza container via run-container.sh script. Unfortunately, this is
> > only
> > > > possible via ProcessJobFactory and ThreadJobFactory right now. Using
> > > YARN,
> > > > you will have to start the job via run-job.sh on the RM. Then, the
> > > > SamzaAppMaster will start the containers automatically.
> > > >
> > > > As for the join example that you are looking for, you should be able
> to
> > > > configure a job that takes two streams w/ the same number of
> partitions
> > > and
> > > > use the default job.systemstreampartition.grouper.factory.
> > > > Then, each of your task will take in the messages from two streams
> from
> > > the
> > > > same partition and your code implementing StreamTask should perform
> the
> > > > join logic.
> > > >
> > > > Hope that explains the procedure at high level. Feel free to ask if
> you
> > > > have further questions.
> > > >
> > > > Thanks!
> > > >
> > > > -Yi
> > > >
> > > > On Sun, Aug 30, 2015 at 5:57 PM, Connie Chen <sp...@gmail.com>
> > > > wrote:
> > > >
> > > > > I am relatively new to Samza as well as YARN/Zookeeper/Kafka, I
> went
> > > > > through the hello-samza
> > > > > <http://samza.apache.org/startup/hello-samza/latest/>example and
> was
> > > > > wondering if there was more documentation/tutorials about
> > > SamzaContainer.
> > > > >
> > > > > There are some files under samza.examples.wikipedia.system in
> > > > hello-samza,
> > > > > it looks like they are using the container API (from reading here
> > > > > <
> > > > >
> > > >
> > >
> >
> http://samza.apache.org/learn/documentation/latest/container/samza-container.html
> > > > > >
> > > > >  and here
> > > > > <
> > > >
> > >
> >
> http://samza.apache.org/learn/documentation/latest/container/streams.html
> > > > > >)
> > > > > and I can't figure out how to run them.
> > > > >
> > > > > I tried bin/run-container.sh but I get this number format
> exception:
> > > > >
> > > > > java.lang.NumberFormatException: null
> > > > > > at java.lang.Integer.parseInt(Integer.java:454)
> > > > > > at java.lang.Integer.parseInt(Integer.java:527)
> > > > > > at
> > > > >
> > scala.collection.immutable.StringLike$class.toInt(StringLike.scala:229)
> > > > > > at scala.collection.immutable.StringOps.toInt(StringOps.scala:31)
> > > > > > at
> > > > > >
> > > > >
> > > >
> > >
> >
> org.apache.samza.container.SamzaContainer$.safeMain(SamzaContainer.scala:82)
> > > > > > at
> > > > >
> > >
> org.apache.samza.container.SamzaContainer$.main(SamzaContainer.scala:69)
> > > > > > at
> > > org.apache.samza.container.SamzaContainer.main(SamzaContainer.scala)
> > > > >
> > > > >
> > > > > Anyone have tips or suggestions for how I can play around with this
> > > more?
> > > > > Mainly, I would like to try partitioning and see an example of
> > joining
> > > > two
> > > > > streams with the embedded k-v store.
> > > > >
> > > > > Thanks in advance!
> > > > >
> > > > > Connie
> > > > >
> > > >
> > >
> >
>