You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@samza.apache.org by "Bae, Jae Hyeon" <me...@gmail.com> on 2015/01/21 22:14:13 UTC

Question on standalone Samza distribution and concerns on container leak

Hi Samza Devs

The significant concern I got recently is, container leak. The data
pipeline based on Samza can guarantee at least once delivery but the
duplicate rate is over 1.0%, I am having alerts right now. Container leaks
will push a lot of alerts to me.

So, we need to find out running Samza on Mesos won't create that problem,
or Spark Streaming won't have that issue. In the worst case, creating our
own distribution coordination might be more predictable instead of running
Yarn on EMR.

What about standalone Samza? If this is quite plausible and the best
solution in the near future, I want to be able to contribute. Could you
share your thoughts or plans?

I really appreciate if you give me some guideline about implementing custom
cluster management interface of Samza. If it's possible, I want to take a
look to replace Yarn support with EC2 ASG stuff.

Thank you
Best, Jae

Re: Question on standalone Samza distribution and concerns on container leak

Posted by Chris Riccomini <cr...@apache.org>.
Hey Jae,

No problem. This is actually an interesting study. :) Please post your
findings with Mesos.

Cheers,
Chris

On Thu, Jan 22, 2015 at 1:53 PM, Bae, Jae Hyeon <me...@gmail.com> wrote:

> Thank you so much. This was really helpful!
>
> On Thu, Jan 22, 2015 at 1:04 PM, Chris Riccomini <cr...@apache.org>
> wrote:
>
>> Hey Jae,
>>
>> Here are my results from testing failure scenario (1), above. I started
>> hello-samza, ran a job, and then killed the RM. The NM hung around for a
>> while, and then killed the orphaned containers, and itself:
>>
>> NM trying to reconnect to dead (or partitioned) RM:
>>
>> criccomi-mn:incubator-samza-hello-samza criccomi$ jps
>> 1650 SamzaAppMaster
>> 1350 Kafka
>> 1687 SamzaContainer
>> 1321 NodeManager
>> 461
>> 1902 Jps
>> 1247 QuorumPeerMain
>>
>> NM decides to kill all of its containers, and itself:
>>
>> criccomi-mn:incubator-samza-hello-samza criccomi$ jps
>> 1925 Jps
>> 1350 Kafka
>> 461
>> 1247 QuorumPeerMain
>>
>> Here are the logs from the NM after killing the RM:
>>
>> 2015-01-22 12:33:22,611 INFO org.apache.hadoop.ipc.Client: Retrying
>> connect
>> to server: localhost/127.0.0.1:8031. Already tried 0 time(s); retry
>> policy
>> is RetryUpToMaximumCountWithFixedSleep(maxRetries=10, sleepTime=1000
>> MILLISECONDS)
>> 2015-01-22 12:33:23,612 INFO org.apache.hadoop.ipc.Client: Retrying
>> connect
>> to server: localhost/127.0.0.1:8031. Already tried 1 time(s); retry
>> policy
>> is RetryUpToMaximumCountWithFixedSleep(maxRetries=10, sleepTime=1000
>> MILLISECONDS)
>> 2015-01-22 12:33:24,613 INFO org.apache.hadoop.ipc.Client: Retrying
>> connect
>> to server: localhost/127.0.0.1:8031. Already tried 2 time(s); retry
>> policy
>> is RetryUpToMaximumCountWithFixedSleep(maxRetries=10, sleepTime=1000
>> MILLISECONDS)
>> 2015-01-22 12:33:25,615 INFO org.apache.hadoop.ipc.Client: Retrying
>> connect
>> to server: localhost/127.0.0.1:8031. Already tried 3 time(s); retry
>> policy
>> is RetryUpToMaximumCountWithFixedSleep(maxRetries=10, sleepTime=1000
>> MILLISECONDS)
>> ....
>> 2015-01-22 12:52:50,096 INFO org.apache.hadoop.ipc.Client: Retrying
>> connect
>> to server: localhost/127.0.0.1:8031. Already tried 7 time(s); retry
>> policy
>> is RetryUpToMaximumCountWithFixedSleep(maxRet
>> ries=10, sleepTime=1000 MILLISECONDS)
>> 2015-01-22 12:52:51,097 INFO org.apache.hadoop.ipc.Client: Retrying
>> connect
>> to server: localhost/127.0.0.1:8031. Already tried 8 time(s); retry
>> policy
>> is RetryUpToMaximumCountWithFixedSleep(maxRet
>> ries=10, sleepTime=1000 MILLISECONDS)
>> 2015-01-22 12:52:52,098 INFO org.apache.hadoop.ipc.Client: Retrying
>> connect
>> to server: localhost/127.0.0.1:8031. Already tried 9 time(s); retry
>> policy
>> is RetryUpToMaximumCountWithFixedSleep(maxRet
>> ries=10, sleepTime=1000 MILLISECONDS)
>>
>> You can see that the NM ran for 20 minutes. I believe this is tunable with
>> configs in:
>>
>>
>>
>> https://hadoop.apache.org/docs/r2.4.0/hadoop-yarn/hadoop-yarn-common/yarn-default.xml
>>
>> Once the final timeout happens, the NM shuts all containers down, and
>> kills
>> itself:
>>
>> 2015-01-22 12:52:52,217 INFO
>>
>> org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl:
>> Applications still running : [application_1421958559415_0001]
>> 2015-01-22 12:52:52,219 INFO
>>
>> org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl:
>> Waiting for Applications to be Finished
>> 2015-01-22 12:52:52,220 INFO
>>
>> org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application:
>> Application application_1421958559415_0001 transitioned from RUNNING to
>> FINISHING_C
>> ONTAINERS_WAIT2015-01-22 12:52:52,220 INFO
>>
>> org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container:
>> Container container_1421958559415_0001_01_000002 transitioned from RUNNING
>> to KILLING
>> 2015-01-22 12:52:52,220 INFO
>>
>> org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container:
>> Container container_1421958559415_0001_01_000001 transitioned from RUNNING
>> to KILLING
>> 2015-01-22 12:52:52,220 INFO
>>
>> org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch:
>> Cleaning up container container_1421958559415_0001_01_000002
>> 2015-01-22 12:52:52,254 INFO
>>
>> org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch:
>> Cleaning up container container_1421958559415_0001_01_000001
>> 2015-01-22 12:52:52,508 WARN
>> org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor: Exit
>> code from container container_1421958559415_0001_01_000002 is : 137
>>
>> When we implement samza-standalone, we will probably have to follow a very
>> similar procedure. If we detect a network split, we'll retry for a little
>> while, and then kill all containers to avoid having duplicates. I am
>> willing to bet that Mesos slaves follow exactly the same behavior when
>> they
>> can't contact the master.
>>
>> What I'm getting at here is that I think that this is pretty unavoidable.
>> The best you can do is wait a little while, and then kill the duplicate
>> (orphaned) containers.
>>
>> Cheers,
>> Chris
>>
>> On Thu, Jan 22, 2015 at 12:42 PM, Chris Riccomini <cr...@apache.org>
>> wrote:
>>
>> > Hey Jae,
>> >
>> > Every resource manager has to solve the split-brain/orphaned container
>> > problem. There are several issues to check:
>> >
>> > 1. Simulate a network partition between the master (RM in YARN) and
>> slave
>> > (NM in YARN).
>> > 2. `kill -9` the slave (NM in YARN).
>> >
>> > In YARN's case, I know for sure that (2) will result in the containers
>> > being leaked. The PPID on the container will be switched to 1. This is
>> just
>> > how UNIX works. I suspect `kill -9`'ing the slave in Mesos will result
>> in
>> > the same behavior.
>> >
>> > For (1), every distributed system has to solve this. How do you detect a
>> > real partition (vs. a long GC, for example), and when you do detect a
>> > partition, how do you react to it.
>> >
>> > I am testing (1) for YARN right now (using hello-samza, and killing the
>> > RM). I will let you know how it behaves shortly. I believe it retries to
>> > connect to the RM for some period of time, and then the NM kills itself
>> if
>> > it can't. If this is the case, then the container *would not be
>> orphaned*.
>> > I also believe the retry count and wait time is tunable, so you can
>> define
>> > your own exposure (e.g. you have a duplicate container for 1 minute,
>> before
>> > the NM shuts itself down).
>> >
>> > Anecdotally, we've not seen leaked containers in YARN since we began
>> > properly shutting down NMs (not kill -9'ing them).
>> >
>> > > Depending on the time line among stabilizing stand alone and Mesos
>> > support
>> >
>> > Regarding stabilizing standalone, I'm working on the design doc right
>> now.
>> > A proposed sketch of a ZK-based implementation was posted on SAMZA-516
>> > yesterday. My goal is to get the design doc done by tomorrow. This would
>> > let us discuss and open subtasks next week, and start coding thereafter.
>> > Realistically, I think standalone can be committed before end of Q1, and
>> > should be usable. After a month or two of operation, I'd wager it'll be
>> > relatively stable. So, that puts things at mid-Q2.
>> >
>> > Cheers,
>> > Chris
>> >
>> > On Thu, Jan 22, 2015 at 12:24 PM, Bae, Jae Hyeon <me...@gmail.com>
>> > wrote:
>> >
>> >> I read through SAMZA-375. We will do one more round PoC Samza on Mesos.
>> >>
>> >> On Thu, Jan 22, 2015 at 11:14 AM, Bae, Jae Hyeon <me...@gmail.com>
>> >> wrote:
>> >>
>> >> > I asked Mantis guy about orphaned container in Mesos and he was
>> almost
>> >> > sure that Mesos won't let that happen.
>> >> >
>> >> > How is https://issues.apache.org/jira/browse/SAMZA-375 going?
>> Depending
>> >> > on the time line among stabilizing stand alone and Mesos support, our
>> >> > schedule or decision will be changed.
>> >> >
>> >> > Thank you
>> >> > Best, Jae
>> >> >
>> >> > On Wed, Jan 21, 2015 at 4:58 PM, Chris Riccomini <
>> >> > criccomini@linkedin.com.invalid> wrote:
>> >> >
>> >> >> Hey all,
>> >> >>
>> >> >> Also, just opened this ticket to track work on samza-standalone:
>> >> >>
>> >> >>   https://issues.apache.org/jira/browse/SAMZA-516
>> >> >>
>> >> >> Cheers,
>> >> >> Chris
>> >> >>
>> >> >> On 1/21/15 1:32 PM, "Chris Riccomini" <cr...@linkedin.com>
>> wrote:
>> >> >>
>> >> >> >Hey Jae,
>> >> >> >
>> >> >> >> So, we need to find out running Samza on Mesos won't create that
>> >> >> >>problem, or Spark Streaming won't have that issue. In the worst
>> case,
>> >> >> >>creating our own distribution coordination might be more
>> predictable
>> >> >> >>instead of running Yarn on EMR.
>> >> >> >
>> >> >> >I think that there are two ways to fix this. One is to have the
>> Kafka
>> >> >> >broker detect that there are two producers that are "the same", and
>> >> start
>> >> >> >dropping messages from the "old one" (and perhaps throw an
>> exception
>> >> to
>> >> >> >the old producer). The other way is to have the Samza container
>> detect
>> >> >> the
>> >> >> >problem, and kill itself.
>> >> >> >
>> >> >> >The kafka-based approach is a subset of the transactionality
>> feature
>> >> >> >described here:
>> >> >> >
>> >> >> >
>> >> >> >
>> >> >>
>> >>
>> https://cwiki.apache.org/confluence/display/KAFKA/Transactional+Messaging+
>> >> >> >i
>> >> >> >n+Kafka
>> >> >> >
>> >> >> >The problem with the Kafka approach is that 1) it's kafka-specific,
>> >> and
>> >> >> 2)
>> >> >> >the generation id required to drop messages from an orphaned
>> producer
>> >> >> >hasn't been implemented, except in a branch that's not been
>> committed.
>> >> >> >
>> >> >> >So, if we accept that we shouldn't use Kafka as the solution for
>> >> >> detecting
>> >> >> >orphaned containers, the solution will have to go into Samza.
>> Within
>> >> >> >Samza, there are two approaches. One is to use the resource
>> scheduler
>> >> >> >(YARN, Mesos, etc.) to detect the problem. The other solution is to
>> >> use
>> >> >> >Samza, itself, to detect the problem.
>> >> >> >
>> >> >> >A YARN-specific example of how to solve the problem would be to
>> have
>> >> the
>> >> >> >SamzaContainer periodically poll its local NM's REST endpoint:
>> >> >> >
>> >> >> >  http://eat1-app1218.corp.linkedin.com:8042/ws/v1/node/info
>> >> >> >
>> >> >> >To see what the status is, its last update time, etc. If the REST
>> >> >> endpoint
>> >> >> >can't be reached, the node is unhealthy, or the last update time
>> is >
>> >> >> some
>> >> >> >time interval, the container could kill itself. Again, this is
>> >> >> >YARN-specific.
>> >> >> >
>> >> >> >I am not sure how Mesos handles split-brain. I've asked Tim Chen on
>> >> >> >SAMZA-375:
>> >> >> >
>> >> >> >
>> >> >> >
>> >> >>
>> >>
>> https://issues.apache.org/jira/browse/SAMZA-375?focusedCommentId=14286204&
>> >> >> >p
>> >> >>
>> >> >>
>> >>
>> >age=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comme
>> >> >> >n
>> >> >> >t-14286204
>> >> >> >
>> >> >> >The last solution that I mentioned, using Samza directly (no
>> >> dependency
>> >> >> on
>> >> >> >Kafka, YARN, Mesos, etc), seems like the best long-term solution to
>> >> me.
>> >> >> We
>> >> >> >can either 1) introduce a heartbeat message into the coordinator
>> >> stream,
>> >> >> >or 2) use the existing checkpoint message as a heartbeat.  There is
>> >> some
>> >> >> >complexity to this solution that would need to be thought through,
>> >> >> though.
>> >> >> >For example, should the heartbeat messages be sent from the main
>> >> thread?
>> >> >> >What happens if the main thread is blocked on process() for an
>> >> extended
>> >> >> >period of time?
>> >> >> >
>> >> >> >What do others think? As a short-term fix, it seems to me like
>> >> YARN/Mesos
>> >> >> >should handle this automatically for us. Has anyone had experience
>> >> with
>> >> >> >orphaned containers in Mesos?
>> >> >> >
>> >> >> >> I really appreciate if you give me some guideline about
>> implementing
>> >> >> >>custom cluster management interface of Samza.
>> >> >> >
>> >> >> >Samza jobs are started through bin/run-job.sh (inside samza-shell).
>> >> This
>> >> >> >CLI uses JobRunner to instantiate a StreamJobFactory (defined with
>> >> >> >job.factory.class), which returns a StreamJob. To implement your
>> own
>> >> >> >cluster management, the first thing you'll need to do is implement
>> >> >> >StreamJobFactory and StreamJob. You can have a look at YarnJob or
>> >> >> >ProcessJob/ProcessJobFactory for an example of how to do this.
>> >> >> >
>> >> >> >Note that this code has changed slightly between 0.8.0 and master
>> >> >> (0.9.0).
>> >> >> >In 0.9.0, the partition-to-container assignment logic has been
>> pulled
>> >> out
>> >> >> >of YARN's AM, and into a JobCoordinator class.
>> >> >> >
>> >> >> >The trick with adding EC2 ASG is going to be in handling partition
>> >> >> >shifting when a new node is added to the group. For example, if you
>> >> have
>> >> >> >two machines, each running one container, and you add a third
>> machine,
>> >> >> >some of the input partitions (and corresponding StreamTasks) need
>> to
>> >> be
>> >> >> >shifted from the two machines on to the third. The only way to do
>> this
>> >> >> >right now is to:
>> >> >> >
>> >> >> >1. Stop all containers.
>> >> >> >2. Re-instantiate the JobCoordinator with a new container count.
>> >> >> >3. Start new containers on all three machines with the new
>> partition
>> >> >> >assignments.
>> >> >> >
>> >> >> >In an ideal world, steps (1-3) would be handled automatically by
>> >> Samza,
>> >> >> >and wouldn't require container restarts. This is precisely what
>> >> >> >samza-standalone will accomplish. If you're interested in
>> >> contributing to
>> >> >> >samza-standalone, that would be awesome. I'm working on a design
>> doc
>> >> >> right
>> >> >> >now, which I'm trying to post by EOW. Once that's done, we can
>> >> >> collaborate
>> >> >> >on design and split the code up, if you'd like.
>> >> >> >
>> >> >> >
>> >> >> >Cheers,
>> >> >> >Chris
>> >> >> >
>> >> >> >On 1/21/15 1:14 PM, "Bae, Jae Hyeon" <me...@gmail.com> wrote:
>> >> >> >
>> >> >> >>Hi Samza Devs
>> >> >> >>
>> >> >> >>The significant concern I got recently is, container leak. The
>> data
>> >> >> >>pipeline based on Samza can guarantee at least once delivery but
>> the
>> >> >> >>duplicate rate is over 1.0%, I am having alerts right now.
>> Container
>> >> >> >>leaks
>> >> >> >>will push a lot of alerts to me.
>> >> >> >>
>> >> >> >>So, we need to find out running Samza on Mesos won't create that
>> >> >> problem,
>> >> >> >>or Spark Streaming won't have that issue. In the worst case,
>> creating
>> >> >> our
>> >> >> >>own distribution coordination might be more predictable instead of
>> >> >> >>running
>> >> >> >>Yarn on EMR.
>> >> >> >>
>> >> >> >>What about standalone Samza? If this is quite plausible and the
>> best
>> >> >> >>solution in the near future, I want to be able to contribute.
>> Could
>> >> you
>> >> >> >>share your thoughts or plans?
>> >> >> >>
>> >> >> >>I really appreciate if you give me some guideline about
>> implementing
>> >> >> >>custom
>> >> >> >>cluster management interface of Samza. If it's possible, I want to
>> >> take
>> >> >> a
>> >> >> >>look to replace Yarn support with EC2 ASG stuff.
>> >> >> >>
>> >> >> >>Thank you
>> >> >> >>Best, Jae
>> >> >> >
>> >> >>
>> >> >>
>> >> >
>> >>
>> >
>> >
>>
>
>

Re: Question on standalone Samza distribution and concerns on container leak

Posted by "Bae, Jae Hyeon" <me...@gmail.com>.
Thank you so much. This was really helpful!

On Thu, Jan 22, 2015 at 1:04 PM, Chris Riccomini <cr...@apache.org>
wrote:

> Hey Jae,
>
> Here are my results from testing failure scenario (1), above. I started
> hello-samza, ran a job, and then killed the RM. The NM hung around for a
> while, and then killed the orphaned containers, and itself:
>
> NM trying to reconnect to dead (or partitioned) RM:
>
> criccomi-mn:incubator-samza-hello-samza criccomi$ jps
> 1650 SamzaAppMaster
> 1350 Kafka
> 1687 SamzaContainer
> 1321 NodeManager
> 461
> 1902 Jps
> 1247 QuorumPeerMain
>
> NM decides to kill all of its containers, and itself:
>
> criccomi-mn:incubator-samza-hello-samza criccomi$ jps
> 1925 Jps
> 1350 Kafka
> 461
> 1247 QuorumPeerMain
>
> Here are the logs from the NM after killing the RM:
>
> 2015-01-22 12:33:22,611 INFO org.apache.hadoop.ipc.Client: Retrying connect
> to server: localhost/127.0.0.1:8031. Already tried 0 time(s); retry policy
> is RetryUpToMaximumCountWithFixedSleep(maxRetries=10, sleepTime=1000
> MILLISECONDS)
> 2015-01-22 12:33:23,612 INFO org.apache.hadoop.ipc.Client: Retrying connect
> to server: localhost/127.0.0.1:8031. Already tried 1 time(s); retry policy
> is RetryUpToMaximumCountWithFixedSleep(maxRetries=10, sleepTime=1000
> MILLISECONDS)
> 2015-01-22 12:33:24,613 INFO org.apache.hadoop.ipc.Client: Retrying connect
> to server: localhost/127.0.0.1:8031. Already tried 2 time(s); retry policy
> is RetryUpToMaximumCountWithFixedSleep(maxRetries=10, sleepTime=1000
> MILLISECONDS)
> 2015-01-22 12:33:25,615 INFO org.apache.hadoop.ipc.Client: Retrying connect
> to server: localhost/127.0.0.1:8031. Already tried 3 time(s); retry policy
> is RetryUpToMaximumCountWithFixedSleep(maxRetries=10, sleepTime=1000
> MILLISECONDS)
> ....
> 2015-01-22 12:52:50,096 INFO org.apache.hadoop.ipc.Client: Retrying connect
> to server: localhost/127.0.0.1:8031. Already tried 7 time(s); retry policy
> is RetryUpToMaximumCountWithFixedSleep(maxRet
> ries=10, sleepTime=1000 MILLISECONDS)
> 2015-01-22 12:52:51,097 INFO org.apache.hadoop.ipc.Client: Retrying connect
> to server: localhost/127.0.0.1:8031. Already tried 8 time(s); retry policy
> is RetryUpToMaximumCountWithFixedSleep(maxRet
> ries=10, sleepTime=1000 MILLISECONDS)
> 2015-01-22 12:52:52,098 INFO org.apache.hadoop.ipc.Client: Retrying connect
> to server: localhost/127.0.0.1:8031. Already tried 9 time(s); retry policy
> is RetryUpToMaximumCountWithFixedSleep(maxRet
> ries=10, sleepTime=1000 MILLISECONDS)
>
> You can see that the NM ran for 20 minutes. I believe this is tunable with
> configs in:
>
>
>
> https://hadoop.apache.org/docs/r2.4.0/hadoop-yarn/hadoop-yarn-common/yarn-default.xml
>
> Once the final timeout happens, the NM shuts all containers down, and kills
> itself:
>
> 2015-01-22 12:52:52,217 INFO
>
> org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl:
> Applications still running : [application_1421958559415_0001]
> 2015-01-22 12:52:52,219 INFO
>
> org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl:
> Waiting for Applications to be Finished
> 2015-01-22 12:52:52,220 INFO
>
> org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application:
> Application application_1421958559415_0001 transitioned from RUNNING to
> FINISHING_C
> ONTAINERS_WAIT2015-01-22 12:52:52,220 INFO
>
> org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container:
> Container container_1421958559415_0001_01_000002 transitioned from RUNNING
> to KILLING
> 2015-01-22 12:52:52,220 INFO
>
> org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container:
> Container container_1421958559415_0001_01_000001 transitioned from RUNNING
> to KILLING
> 2015-01-22 12:52:52,220 INFO
>
> org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch:
> Cleaning up container container_1421958559415_0001_01_000002
> 2015-01-22 12:52:52,254 INFO
>
> org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch:
> Cleaning up container container_1421958559415_0001_01_000001
> 2015-01-22 12:52:52,508 WARN
> org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor: Exit
> code from container container_1421958559415_0001_01_000002 is : 137
>
> When we implement samza-standalone, we will probably have to follow a very
> similar procedure. If we detect a network split, we'll retry for a little
> while, and then kill all containers to avoid having duplicates. I am
> willing to bet that Mesos slaves follow exactly the same behavior when they
> can't contact the master.
>
> What I'm getting at here is that I think that this is pretty unavoidable.
> The best you can do is wait a little while, and then kill the duplicate
> (orphaned) containers.
>
> Cheers,
> Chris
>
> On Thu, Jan 22, 2015 at 12:42 PM, Chris Riccomini <cr...@apache.org>
> wrote:
>
> > Hey Jae,
> >
> > Every resource manager has to solve the split-brain/orphaned container
> > problem. There are several issues to check:
> >
> > 1. Simulate a network partition between the master (RM in YARN) and slave
> > (NM in YARN).
> > 2. `kill -9` the slave (NM in YARN).
> >
> > In YARN's case, I know for sure that (2) will result in the containers
> > being leaked. The PPID on the container will be switched to 1. This is
> just
> > how UNIX works. I suspect `kill -9`'ing the slave in Mesos will result in
> > the same behavior.
> >
> > For (1), every distributed system has to solve this. How do you detect a
> > real partition (vs. a long GC, for example), and when you do detect a
> > partition, how do you react to it.
> >
> > I am testing (1) for YARN right now (using hello-samza, and killing the
> > RM). I will let you know how it behaves shortly. I believe it retries to
> > connect to the RM for some period of time, and then the NM kills itself
> if
> > it can't. If this is the case, then the container *would not be
> orphaned*.
> > I also believe the retry count and wait time is tunable, so you can
> define
> > your own exposure (e.g. you have a duplicate container for 1 minute,
> before
> > the NM shuts itself down).
> >
> > Anecdotally, we've not seen leaked containers in YARN since we began
> > properly shutting down NMs (not kill -9'ing them).
> >
> > > Depending on the time line among stabilizing stand alone and Mesos
> > support
> >
> > Regarding stabilizing standalone, I'm working on the design doc right
> now.
> > A proposed sketch of a ZK-based implementation was posted on SAMZA-516
> > yesterday. My goal is to get the design doc done by tomorrow. This would
> > let us discuss and open subtasks next week, and start coding thereafter.
> > Realistically, I think standalone can be committed before end of Q1, and
> > should be usable. After a month or two of operation, I'd wager it'll be
> > relatively stable. So, that puts things at mid-Q2.
> >
> > Cheers,
> > Chris
> >
> > On Thu, Jan 22, 2015 at 12:24 PM, Bae, Jae Hyeon <me...@gmail.com>
> > wrote:
> >
> >> I read through SAMZA-375. We will do one more round PoC Samza on Mesos.
> >>
> >> On Thu, Jan 22, 2015 at 11:14 AM, Bae, Jae Hyeon <me...@gmail.com>
> >> wrote:
> >>
> >> > I asked Mantis guy about orphaned container in Mesos and he was almost
> >> > sure that Mesos won't let that happen.
> >> >
> >> > How is https://issues.apache.org/jira/browse/SAMZA-375 going?
> Depending
> >> > on the time line among stabilizing stand alone and Mesos support, our
> >> > schedule or decision will be changed.
> >> >
> >> > Thank you
> >> > Best, Jae
> >> >
> >> > On Wed, Jan 21, 2015 at 4:58 PM, Chris Riccomini <
> >> > criccomini@linkedin.com.invalid> wrote:
> >> >
> >> >> Hey all,
> >> >>
> >> >> Also, just opened this ticket to track work on samza-standalone:
> >> >>
> >> >>   https://issues.apache.org/jira/browse/SAMZA-516
> >> >>
> >> >> Cheers,
> >> >> Chris
> >> >>
> >> >> On 1/21/15 1:32 PM, "Chris Riccomini" <cr...@linkedin.com>
> wrote:
> >> >>
> >> >> >Hey Jae,
> >> >> >
> >> >> >> So, we need to find out running Samza on Mesos won't create that
> >> >> >>problem, or Spark Streaming won't have that issue. In the worst
> case,
> >> >> >>creating our own distribution coordination might be more
> predictable
> >> >> >>instead of running Yarn on EMR.
> >> >> >
> >> >> >I think that there are two ways to fix this. One is to have the
> Kafka
> >> >> >broker detect that there are two producers that are "the same", and
> >> start
> >> >> >dropping messages from the "old one" (and perhaps throw an exception
> >> to
> >> >> >the old producer). The other way is to have the Samza container
> detect
> >> >> the
> >> >> >problem, and kill itself.
> >> >> >
> >> >> >The kafka-based approach is a subset of the transactionality feature
> >> >> >described here:
> >> >> >
> >> >> >
> >> >> >
> >> >>
> >>
> https://cwiki.apache.org/confluence/display/KAFKA/Transactional+Messaging+
> >> >> >i
> >> >> >n+Kafka
> >> >> >
> >> >> >The problem with the Kafka approach is that 1) it's kafka-specific,
> >> and
> >> >> 2)
> >> >> >the generation id required to drop messages from an orphaned
> producer
> >> >> >hasn't been implemented, except in a branch that's not been
> committed.
> >> >> >
> >> >> >So, if we accept that we shouldn't use Kafka as the solution for
> >> >> detecting
> >> >> >orphaned containers, the solution will have to go into Samza. Within
> >> >> >Samza, there are two approaches. One is to use the resource
> scheduler
> >> >> >(YARN, Mesos, etc.) to detect the problem. The other solution is to
> >> use
> >> >> >Samza, itself, to detect the problem.
> >> >> >
> >> >> >A YARN-specific example of how to solve the problem would be to have
> >> the
> >> >> >SamzaContainer periodically poll its local NM's REST endpoint:
> >> >> >
> >> >> >  http://eat1-app1218.corp.linkedin.com:8042/ws/v1/node/info
> >> >> >
> >> >> >To see what the status is, its last update time, etc. If the REST
> >> >> endpoint
> >> >> >can't be reached, the node is unhealthy, or the last update time is
> >
> >> >> some
> >> >> >time interval, the container could kill itself. Again, this is
> >> >> >YARN-specific.
> >> >> >
> >> >> >I am not sure how Mesos handles split-brain. I've asked Tim Chen on
> >> >> >SAMZA-375:
> >> >> >
> >> >> >
> >> >> >
> >> >>
> >>
> https://issues.apache.org/jira/browse/SAMZA-375?focusedCommentId=14286204&
> >> >> >p
> >> >>
> >> >>
> >>
> >age=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comme
> >> >> >n
> >> >> >t-14286204
> >> >> >
> >> >> >The last solution that I mentioned, using Samza directly (no
> >> dependency
> >> >> on
> >> >> >Kafka, YARN, Mesos, etc), seems like the best long-term solution to
> >> me.
> >> >> We
> >> >> >can either 1) introduce a heartbeat message into the coordinator
> >> stream,
> >> >> >or 2) use the existing checkpoint message as a heartbeat.  There is
> >> some
> >> >> >complexity to this solution that would need to be thought through,
> >> >> though.
> >> >> >For example, should the heartbeat messages be sent from the main
> >> thread?
> >> >> >What happens if the main thread is blocked on process() for an
> >> extended
> >> >> >period of time?
> >> >> >
> >> >> >What do others think? As a short-term fix, it seems to me like
> >> YARN/Mesos
> >> >> >should handle this automatically for us. Has anyone had experience
> >> with
> >> >> >orphaned containers in Mesos?
> >> >> >
> >> >> >> I really appreciate if you give me some guideline about
> implementing
> >> >> >>custom cluster management interface of Samza.
> >> >> >
> >> >> >Samza jobs are started through bin/run-job.sh (inside samza-shell).
> >> This
> >> >> >CLI uses JobRunner to instantiate a StreamJobFactory (defined with
> >> >> >job.factory.class), which returns a StreamJob. To implement your own
> >> >> >cluster management, the first thing you'll need to do is implement
> >> >> >StreamJobFactory and StreamJob. You can have a look at YarnJob or
> >> >> >ProcessJob/ProcessJobFactory for an example of how to do this.
> >> >> >
> >> >> >Note that this code has changed slightly between 0.8.0 and master
> >> >> (0.9.0).
> >> >> >In 0.9.0, the partition-to-container assignment logic has been
> pulled
> >> out
> >> >> >of YARN's AM, and into a JobCoordinator class.
> >> >> >
> >> >> >The trick with adding EC2 ASG is going to be in handling partition
> >> >> >shifting when a new node is added to the group. For example, if you
> >> have
> >> >> >two machines, each running one container, and you add a third
> machine,
> >> >> >some of the input partitions (and corresponding StreamTasks) need to
> >> be
> >> >> >shifted from the two machines on to the third. The only way to do
> this
> >> >> >right now is to:
> >> >> >
> >> >> >1. Stop all containers.
> >> >> >2. Re-instantiate the JobCoordinator with a new container count.
> >> >> >3. Start new containers on all three machines with the new partition
> >> >> >assignments.
> >> >> >
> >> >> >In an ideal world, steps (1-3) would be handled automatically by
> >> Samza,
> >> >> >and wouldn't require container restarts. This is precisely what
> >> >> >samza-standalone will accomplish. If you're interested in
> >> contributing to
> >> >> >samza-standalone, that would be awesome. I'm working on a design doc
> >> >> right
> >> >> >now, which I'm trying to post by EOW. Once that's done, we can
> >> >> collaborate
> >> >> >on design and split the code up, if you'd like.
> >> >> >
> >> >> >
> >> >> >Cheers,
> >> >> >Chris
> >> >> >
> >> >> >On 1/21/15 1:14 PM, "Bae, Jae Hyeon" <me...@gmail.com> wrote:
> >> >> >
> >> >> >>Hi Samza Devs
> >> >> >>
> >> >> >>The significant concern I got recently is, container leak. The data
> >> >> >>pipeline based on Samza can guarantee at least once delivery but
> the
> >> >> >>duplicate rate is over 1.0%, I am having alerts right now.
> Container
> >> >> >>leaks
> >> >> >>will push a lot of alerts to me.
> >> >> >>
> >> >> >>So, we need to find out running Samza on Mesos won't create that
> >> >> problem,
> >> >> >>or Spark Streaming won't have that issue. In the worst case,
> creating
> >> >> our
> >> >> >>own distribution coordination might be more predictable instead of
> >> >> >>running
> >> >> >>Yarn on EMR.
> >> >> >>
> >> >> >>What about standalone Samza? If this is quite plausible and the
> best
> >> >> >>solution in the near future, I want to be able to contribute. Could
> >> you
> >> >> >>share your thoughts or plans?
> >> >> >>
> >> >> >>I really appreciate if you give me some guideline about
> implementing
> >> >> >>custom
> >> >> >>cluster management interface of Samza. If it's possible, I want to
> >> take
> >> >> a
> >> >> >>look to replace Yarn support with EC2 ASG stuff.
> >> >> >>
> >> >> >>Thank you
> >> >> >>Best, Jae
> >> >> >
> >> >>
> >> >>
> >> >
> >>
> >
> >
>

Re: Question on standalone Samza distribution and concerns on container leak

Posted by Chris Riccomini <cr...@apache.org>.
Hey Jae,

Here are my results from testing failure scenario (1), above. I started
hello-samza, ran a job, and then killed the RM. The NM hung around for a
while, and then killed the orphaned containers, and itself:

NM trying to reconnect to dead (or partitioned) RM:

criccomi-mn:incubator-samza-hello-samza criccomi$ jps
1650 SamzaAppMaster
1350 Kafka
1687 SamzaContainer
1321 NodeManager
461
1902 Jps
1247 QuorumPeerMain

NM decides to kill all of its containers, and itself:

criccomi-mn:incubator-samza-hello-samza criccomi$ jps
1925 Jps
1350 Kafka
461
1247 QuorumPeerMain

Here are the logs from the NM after killing the RM:

2015-01-22 12:33:22,611 INFO org.apache.hadoop.ipc.Client: Retrying connect
to server: localhost/127.0.0.1:8031. Already tried 0 time(s); retry policy
is RetryUpToMaximumCountWithFixedSleep(maxRetries=10, sleepTime=1000
MILLISECONDS)
2015-01-22 12:33:23,612 INFO org.apache.hadoop.ipc.Client: Retrying connect
to server: localhost/127.0.0.1:8031. Already tried 1 time(s); retry policy
is RetryUpToMaximumCountWithFixedSleep(maxRetries=10, sleepTime=1000
MILLISECONDS)
2015-01-22 12:33:24,613 INFO org.apache.hadoop.ipc.Client: Retrying connect
to server: localhost/127.0.0.1:8031. Already tried 2 time(s); retry policy
is RetryUpToMaximumCountWithFixedSleep(maxRetries=10, sleepTime=1000
MILLISECONDS)
2015-01-22 12:33:25,615 INFO org.apache.hadoop.ipc.Client: Retrying connect
to server: localhost/127.0.0.1:8031. Already tried 3 time(s); retry policy
is RetryUpToMaximumCountWithFixedSleep(maxRetries=10, sleepTime=1000
MILLISECONDS)
....
2015-01-22 12:52:50,096 INFO org.apache.hadoop.ipc.Client: Retrying connect
to server: localhost/127.0.0.1:8031. Already tried 7 time(s); retry policy
is RetryUpToMaximumCountWithFixedSleep(maxRet
ries=10, sleepTime=1000 MILLISECONDS)
2015-01-22 12:52:51,097 INFO org.apache.hadoop.ipc.Client: Retrying connect
to server: localhost/127.0.0.1:8031. Already tried 8 time(s); retry policy
is RetryUpToMaximumCountWithFixedSleep(maxRet
ries=10, sleepTime=1000 MILLISECONDS)
2015-01-22 12:52:52,098 INFO org.apache.hadoop.ipc.Client: Retrying connect
to server: localhost/127.0.0.1:8031. Already tried 9 time(s); retry policy
is RetryUpToMaximumCountWithFixedSleep(maxRet
ries=10, sleepTime=1000 MILLISECONDS)

You can see that the NM ran for 20 minutes. I believe this is tunable with
configs in:


https://hadoop.apache.org/docs/r2.4.0/hadoop-yarn/hadoop-yarn-common/yarn-default.xml

Once the final timeout happens, the NM shuts all containers down, and kills
itself:

2015-01-22 12:52:52,217 INFO
org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl:
Applications still running : [application_1421958559415_0001]
2015-01-22 12:52:52,219 INFO
org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl:
Waiting for Applications to be Finished
2015-01-22 12:52:52,220 INFO
org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application:
Application application_1421958559415_0001 transitioned from RUNNING to
FINISHING_C
ONTAINERS_WAIT2015-01-22 12:52:52,220 INFO
org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container:
Container container_1421958559415_0001_01_000002 transitioned from RUNNING
to KILLING
2015-01-22 12:52:52,220 INFO
org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container:
Container container_1421958559415_0001_01_000001 transitioned from RUNNING
to KILLING
2015-01-22 12:52:52,220 INFO
org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch:
Cleaning up container container_1421958559415_0001_01_000002
2015-01-22 12:52:52,254 INFO
org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch:
Cleaning up container container_1421958559415_0001_01_000001
2015-01-22 12:52:52,508 WARN
org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor: Exit
code from container container_1421958559415_0001_01_000002 is : 137

When we implement samza-standalone, we will probably have to follow a very
similar procedure. If we detect a network split, we'll retry for a little
while, and then kill all containers to avoid having duplicates. I am
willing to bet that Mesos slaves follow exactly the same behavior when they
can't contact the master.

What I'm getting at here is that I think that this is pretty unavoidable.
The best you can do is wait a little while, and then kill the duplicate
(orphaned) containers.

Cheers,
Chris

On Thu, Jan 22, 2015 at 12:42 PM, Chris Riccomini <cr...@apache.org>
wrote:

> Hey Jae,
>
> Every resource manager has to solve the split-brain/orphaned container
> problem. There are several issues to check:
>
> 1. Simulate a network partition between the master (RM in YARN) and slave
> (NM in YARN).
> 2. `kill -9` the slave (NM in YARN).
>
> In YARN's case, I know for sure that (2) will result in the containers
> being leaked. The PPID on the container will be switched to 1. This is just
> how UNIX works. I suspect `kill -9`'ing the slave in Mesos will result in
> the same behavior.
>
> For (1), every distributed system has to solve this. How do you detect a
> real partition (vs. a long GC, for example), and when you do detect a
> partition, how do you react to it.
>
> I am testing (1) for YARN right now (using hello-samza, and killing the
> RM). I will let you know how it behaves shortly. I believe it retries to
> connect to the RM for some period of time, and then the NM kills itself if
> it can't. If this is the case, then the container *would not be orphaned*.
> I also believe the retry count and wait time is tunable, so you can define
> your own exposure (e.g. you have a duplicate container for 1 minute, before
> the NM shuts itself down).
>
> Anecdotally, we've not seen leaked containers in YARN since we began
> properly shutting down NMs (not kill -9'ing them).
>
> > Depending on the time line among stabilizing stand alone and Mesos
> support
>
> Regarding stabilizing standalone, I'm working on the design doc right now.
> A proposed sketch of a ZK-based implementation was posted on SAMZA-516
> yesterday. My goal is to get the design doc done by tomorrow. This would
> let us discuss and open subtasks next week, and start coding thereafter.
> Realistically, I think standalone can be committed before end of Q1, and
> should be usable. After a month or two of operation, I'd wager it'll be
> relatively stable. So, that puts things at mid-Q2.
>
> Cheers,
> Chris
>
> On Thu, Jan 22, 2015 at 12:24 PM, Bae, Jae Hyeon <me...@gmail.com>
> wrote:
>
>> I read through SAMZA-375. We will do one more round PoC Samza on Mesos.
>>
>> On Thu, Jan 22, 2015 at 11:14 AM, Bae, Jae Hyeon <me...@gmail.com>
>> wrote:
>>
>> > I asked Mantis guy about orphaned container in Mesos and he was almost
>> > sure that Mesos won't let that happen.
>> >
>> > How is https://issues.apache.org/jira/browse/SAMZA-375 going? Depending
>> > on the time line among stabilizing stand alone and Mesos support, our
>> > schedule or decision will be changed.
>> >
>> > Thank you
>> > Best, Jae
>> >
>> > On Wed, Jan 21, 2015 at 4:58 PM, Chris Riccomini <
>> > criccomini@linkedin.com.invalid> wrote:
>> >
>> >> Hey all,
>> >>
>> >> Also, just opened this ticket to track work on samza-standalone:
>> >>
>> >>   https://issues.apache.org/jira/browse/SAMZA-516
>> >>
>> >> Cheers,
>> >> Chris
>> >>
>> >> On 1/21/15 1:32 PM, "Chris Riccomini" <cr...@linkedin.com> wrote:
>> >>
>> >> >Hey Jae,
>> >> >
>> >> >> So, we need to find out running Samza on Mesos won't create that
>> >> >>problem, or Spark Streaming won't have that issue. In the worst case,
>> >> >>creating our own distribution coordination might be more predictable
>> >> >>instead of running Yarn on EMR.
>> >> >
>> >> >I think that there are two ways to fix this. One is to have the Kafka
>> >> >broker detect that there are two producers that are "the same", and
>> start
>> >> >dropping messages from the "old one" (and perhaps throw an exception
>> to
>> >> >the old producer). The other way is to have the Samza container detect
>> >> the
>> >> >problem, and kill itself.
>> >> >
>> >> >The kafka-based approach is a subset of the transactionality feature
>> >> >described here:
>> >> >
>> >> >
>> >> >
>> >>
>> https://cwiki.apache.org/confluence/display/KAFKA/Transactional+Messaging+
>> >> >i
>> >> >n+Kafka
>> >> >
>> >> >The problem with the Kafka approach is that 1) it's kafka-specific,
>> and
>> >> 2)
>> >> >the generation id required to drop messages from an orphaned producer
>> >> >hasn't been implemented, except in a branch that's not been committed.
>> >> >
>> >> >So, if we accept that we shouldn't use Kafka as the solution for
>> >> detecting
>> >> >orphaned containers, the solution will have to go into Samza. Within
>> >> >Samza, there are two approaches. One is to use the resource scheduler
>> >> >(YARN, Mesos, etc.) to detect the problem. The other solution is to
>> use
>> >> >Samza, itself, to detect the problem.
>> >> >
>> >> >A YARN-specific example of how to solve the problem would be to have
>> the
>> >> >SamzaContainer periodically poll its local NM's REST endpoint:
>> >> >
>> >> >  http://eat1-app1218.corp.linkedin.com:8042/ws/v1/node/info
>> >> >
>> >> >To see what the status is, its last update time, etc. If the REST
>> >> endpoint
>> >> >can't be reached, the node is unhealthy, or the last update time is >
>> >> some
>> >> >time interval, the container could kill itself. Again, this is
>> >> >YARN-specific.
>> >> >
>> >> >I am not sure how Mesos handles split-brain. I've asked Tim Chen on
>> >> >SAMZA-375:
>> >> >
>> >> >
>> >> >
>> >>
>> https://issues.apache.org/jira/browse/SAMZA-375?focusedCommentId=14286204&
>> >> >p
>> >>
>> >>
>> >age=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comme
>> >> >n
>> >> >t-14286204
>> >> >
>> >> >The last solution that I mentioned, using Samza directly (no
>> dependency
>> >> on
>> >> >Kafka, YARN, Mesos, etc), seems like the best long-term solution to
>> me.
>> >> We
>> >> >can either 1) introduce a heartbeat message into the coordinator
>> stream,
>> >> >or 2) use the existing checkpoint message as a heartbeat.  There is
>> some
>> >> >complexity to this solution that would need to be thought through,
>> >> though.
>> >> >For example, should the heartbeat messages be sent from the main
>> thread?
>> >> >What happens if the main thread is blocked on process() for an
>> extended
>> >> >period of time?
>> >> >
>> >> >What do others think? As a short-term fix, it seems to me like
>> YARN/Mesos
>> >> >should handle this automatically for us. Has anyone had experience
>> with
>> >> >orphaned containers in Mesos?
>> >> >
>> >> >> I really appreciate if you give me some guideline about implementing
>> >> >>custom cluster management interface of Samza.
>> >> >
>> >> >Samza jobs are started through bin/run-job.sh (inside samza-shell).
>> This
>> >> >CLI uses JobRunner to instantiate a StreamJobFactory (defined with
>> >> >job.factory.class), which returns a StreamJob. To implement your own
>> >> >cluster management, the first thing you'll need to do is implement
>> >> >StreamJobFactory and StreamJob. You can have a look at YarnJob or
>> >> >ProcessJob/ProcessJobFactory for an example of how to do this.
>> >> >
>> >> >Note that this code has changed slightly between 0.8.0 and master
>> >> (0.9.0).
>> >> >In 0.9.0, the partition-to-container assignment logic has been pulled
>> out
>> >> >of YARN's AM, and into a JobCoordinator class.
>> >> >
>> >> >The trick with adding EC2 ASG is going to be in handling partition
>> >> >shifting when a new node is added to the group. For example, if you
>> have
>> >> >two machines, each running one container, and you add a third machine,
>> >> >some of the input partitions (and corresponding StreamTasks) need to
>> be
>> >> >shifted from the two machines on to the third. The only way to do this
>> >> >right now is to:
>> >> >
>> >> >1. Stop all containers.
>> >> >2. Re-instantiate the JobCoordinator with a new container count.
>> >> >3. Start new containers on all three machines with the new partition
>> >> >assignments.
>> >> >
>> >> >In an ideal world, steps (1-3) would be handled automatically by
>> Samza,
>> >> >and wouldn't require container restarts. This is precisely what
>> >> >samza-standalone will accomplish. If you're interested in
>> contributing to
>> >> >samza-standalone, that would be awesome. I'm working on a design doc
>> >> right
>> >> >now, which I'm trying to post by EOW. Once that's done, we can
>> >> collaborate
>> >> >on design and split the code up, if you'd like.
>> >> >
>> >> >
>> >> >Cheers,
>> >> >Chris
>> >> >
>> >> >On 1/21/15 1:14 PM, "Bae, Jae Hyeon" <me...@gmail.com> wrote:
>> >> >
>> >> >>Hi Samza Devs
>> >> >>
>> >> >>The significant concern I got recently is, container leak. The data
>> >> >>pipeline based on Samza can guarantee at least once delivery but the
>> >> >>duplicate rate is over 1.0%, I am having alerts right now. Container
>> >> >>leaks
>> >> >>will push a lot of alerts to me.
>> >> >>
>> >> >>So, we need to find out running Samza on Mesos won't create that
>> >> problem,
>> >> >>or Spark Streaming won't have that issue. In the worst case, creating
>> >> our
>> >> >>own distribution coordination might be more predictable instead of
>> >> >>running
>> >> >>Yarn on EMR.
>> >> >>
>> >> >>What about standalone Samza? If this is quite plausible and the best
>> >> >>solution in the near future, I want to be able to contribute. Could
>> you
>> >> >>share your thoughts or plans?
>> >> >>
>> >> >>I really appreciate if you give me some guideline about implementing
>> >> >>custom
>> >> >>cluster management interface of Samza. If it's possible, I want to
>> take
>> >> a
>> >> >>look to replace Yarn support with EC2 ASG stuff.
>> >> >>
>> >> >>Thank you
>> >> >>Best, Jae
>> >> >
>> >>
>> >>
>> >
>>
>
>

Re: Question on standalone Samza distribution and concerns on container leak

Posted by Chris Riccomini <cr...@apache.org>.
Hey Jae,

Every resource manager has to solve the split-brain/orphaned container
problem. There are several issues to check:

1. Simulate a network partition between the master (RM in YARN) and slave
(NM in YARN).
2. `kill -9` the slave (NM in YARN).

In YARN's case, I know for sure that (2) will result in the containers
being leaked. The PPID on the container will be switched to 1. This is just
how UNIX works. I suspect `kill -9`'ing the slave in Mesos will result in
the same behavior.

For (1), every distributed system has to solve this. How do you detect a
real partition (vs. a long GC, for example), and when you do detect a
partition, how do you react to it.

I am testing (1) for YARN right now (using hello-samza, and killing the
RM). I will let you know how it behaves shortly. I believe it retries to
connect to the RM for some period of time, and then the NM kills itself if
it can't. If this is the case, then the container *would not be orphaned*.
I also believe the retry count and wait time is tunable, so you can define
your own exposure (e.g. you have a duplicate container for 1 minute, before
the NM shuts itself down).

Anecdotally, we've not seen leaked containers in YARN since we began
properly shutting down NMs (not kill -9'ing them).

> Depending on the time line among stabilizing stand alone and Mesos support

Regarding stabilizing standalone, I'm working on the design doc right now.
A proposed sketch of a ZK-based implementation was posted on SAMZA-516
yesterday. My goal is to get the design doc done by tomorrow. This would
let us discuss and open subtasks next week, and start coding thereafter.
Realistically, I think standalone can be committed before end of Q1, and
should be usable. After a month or two of operation, I'd wager it'll be
relatively stable. So, that puts things at mid-Q2.

Cheers,
Chris

On Thu, Jan 22, 2015 at 12:24 PM, Bae, Jae Hyeon <me...@gmail.com> wrote:

> I read through SAMZA-375. We will do one more round PoC Samza on Mesos.
>
> On Thu, Jan 22, 2015 at 11:14 AM, Bae, Jae Hyeon <me...@gmail.com>
> wrote:
>
> > I asked Mantis guy about orphaned container in Mesos and he was almost
> > sure that Mesos won't let that happen.
> >
> > How is https://issues.apache.org/jira/browse/SAMZA-375 going? Depending
> > on the time line among stabilizing stand alone and Mesos support, our
> > schedule or decision will be changed.
> >
> > Thank you
> > Best, Jae
> >
> > On Wed, Jan 21, 2015 at 4:58 PM, Chris Riccomini <
> > criccomini@linkedin.com.invalid> wrote:
> >
> >> Hey all,
> >>
> >> Also, just opened this ticket to track work on samza-standalone:
> >>
> >>   https://issues.apache.org/jira/browse/SAMZA-516
> >>
> >> Cheers,
> >> Chris
> >>
> >> On 1/21/15 1:32 PM, "Chris Riccomini" <cr...@linkedin.com> wrote:
> >>
> >> >Hey Jae,
> >> >
> >> >> So, we need to find out running Samza on Mesos won't create that
> >> >>problem, or Spark Streaming won't have that issue. In the worst case,
> >> >>creating our own distribution coordination might be more predictable
> >> >>instead of running Yarn on EMR.
> >> >
> >> >I think that there are two ways to fix this. One is to have the Kafka
> >> >broker detect that there are two producers that are "the same", and
> start
> >> >dropping messages from the "old one" (and perhaps throw an exception to
> >> >the old producer). The other way is to have the Samza container detect
> >> the
> >> >problem, and kill itself.
> >> >
> >> >The kafka-based approach is a subset of the transactionality feature
> >> >described here:
> >> >
> >> >
> >> >
> >>
> https://cwiki.apache.org/confluence/display/KAFKA/Transactional+Messaging+
> >> >i
> >> >n+Kafka
> >> >
> >> >The problem with the Kafka approach is that 1) it's kafka-specific, and
> >> 2)
> >> >the generation id required to drop messages from an orphaned producer
> >> >hasn't been implemented, except in a branch that's not been committed.
> >> >
> >> >So, if we accept that we shouldn't use Kafka as the solution for
> >> detecting
> >> >orphaned containers, the solution will have to go into Samza. Within
> >> >Samza, there are two approaches. One is to use the resource scheduler
> >> >(YARN, Mesos, etc.) to detect the problem. The other solution is to use
> >> >Samza, itself, to detect the problem.
> >> >
> >> >A YARN-specific example of how to solve the problem would be to have
> the
> >> >SamzaContainer periodically poll its local NM's REST endpoint:
> >> >
> >> >  http://eat1-app1218.corp.linkedin.com:8042/ws/v1/node/info
> >> >
> >> >To see what the status is, its last update time, etc. If the REST
> >> endpoint
> >> >can't be reached, the node is unhealthy, or the last update time is >
> >> some
> >> >time interval, the container could kill itself. Again, this is
> >> >YARN-specific.
> >> >
> >> >I am not sure how Mesos handles split-brain. I've asked Tim Chen on
> >> >SAMZA-375:
> >> >
> >> >
> >> >
> >>
> https://issues.apache.org/jira/browse/SAMZA-375?focusedCommentId=14286204&
> >> >p
> >>
> >>
> >age=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comme
> >> >n
> >> >t-14286204
> >> >
> >> >The last solution that I mentioned, using Samza directly (no dependency
> >> on
> >> >Kafka, YARN, Mesos, etc), seems like the best long-term solution to me.
> >> We
> >> >can either 1) introduce a heartbeat message into the coordinator
> stream,
> >> >or 2) use the existing checkpoint message as a heartbeat.  There is
> some
> >> >complexity to this solution that would need to be thought through,
> >> though.
> >> >For example, should the heartbeat messages be sent from the main
> thread?
> >> >What happens if the main thread is blocked on process() for an extended
> >> >period of time?
> >> >
> >> >What do others think? As a short-term fix, it seems to me like
> YARN/Mesos
> >> >should handle this automatically for us. Has anyone had experience with
> >> >orphaned containers in Mesos?
> >> >
> >> >> I really appreciate if you give me some guideline about implementing
> >> >>custom cluster management interface of Samza.
> >> >
> >> >Samza jobs are started through bin/run-job.sh (inside samza-shell).
> This
> >> >CLI uses JobRunner to instantiate a StreamJobFactory (defined with
> >> >job.factory.class), which returns a StreamJob. To implement your own
> >> >cluster management, the first thing you'll need to do is implement
> >> >StreamJobFactory and StreamJob. You can have a look at YarnJob or
> >> >ProcessJob/ProcessJobFactory for an example of how to do this.
> >> >
> >> >Note that this code has changed slightly between 0.8.0 and master
> >> (0.9.0).
> >> >In 0.9.0, the partition-to-container assignment logic has been pulled
> out
> >> >of YARN's AM, and into a JobCoordinator class.
> >> >
> >> >The trick with adding EC2 ASG is going to be in handling partition
> >> >shifting when a new node is added to the group. For example, if you
> have
> >> >two machines, each running one container, and you add a third machine,
> >> >some of the input partitions (and corresponding StreamTasks) need to be
> >> >shifted from the two machines on to the third. The only way to do this
> >> >right now is to:
> >> >
> >> >1. Stop all containers.
> >> >2. Re-instantiate the JobCoordinator with a new container count.
> >> >3. Start new containers on all three machines with the new partition
> >> >assignments.
> >> >
> >> >In an ideal world, steps (1-3) would be handled automatically by Samza,
> >> >and wouldn't require container restarts. This is precisely what
> >> >samza-standalone will accomplish. If you're interested in contributing
> to
> >> >samza-standalone, that would be awesome. I'm working on a design doc
> >> right
> >> >now, which I'm trying to post by EOW. Once that's done, we can
> >> collaborate
> >> >on design and split the code up, if you'd like.
> >> >
> >> >
> >> >Cheers,
> >> >Chris
> >> >
> >> >On 1/21/15 1:14 PM, "Bae, Jae Hyeon" <me...@gmail.com> wrote:
> >> >
> >> >>Hi Samza Devs
> >> >>
> >> >>The significant concern I got recently is, container leak. The data
> >> >>pipeline based on Samza can guarantee at least once delivery but the
> >> >>duplicate rate is over 1.0%, I am having alerts right now. Container
> >> >>leaks
> >> >>will push a lot of alerts to me.
> >> >>
> >> >>So, we need to find out running Samza on Mesos won't create that
> >> problem,
> >> >>or Spark Streaming won't have that issue. In the worst case, creating
> >> our
> >> >>own distribution coordination might be more predictable instead of
> >> >>running
> >> >>Yarn on EMR.
> >> >>
> >> >>What about standalone Samza? If this is quite plausible and the best
> >> >>solution in the near future, I want to be able to contribute. Could
> you
> >> >>share your thoughts or plans?
> >> >>
> >> >>I really appreciate if you give me some guideline about implementing
> >> >>custom
> >> >>cluster management interface of Samza. If it's possible, I want to
> take
> >> a
> >> >>look to replace Yarn support with EC2 ASG stuff.
> >> >>
> >> >>Thank you
> >> >>Best, Jae
> >> >
> >>
> >>
> >
>

Re: Question on standalone Samza distribution and concerns on container leak

Posted by "Bae, Jae Hyeon" <me...@gmail.com>.
I read through SAMZA-375. We will do one more round PoC Samza on Mesos.

On Thu, Jan 22, 2015 at 11:14 AM, Bae, Jae Hyeon <me...@gmail.com> wrote:

> I asked Mantis guy about orphaned container in Mesos and he was almost
> sure that Mesos won't let that happen.
>
> How is https://issues.apache.org/jira/browse/SAMZA-375 going? Depending
> on the time line among stabilizing stand alone and Mesos support, our
> schedule or decision will be changed.
>
> Thank you
> Best, Jae
>
> On Wed, Jan 21, 2015 at 4:58 PM, Chris Riccomini <
> criccomini@linkedin.com.invalid> wrote:
>
>> Hey all,
>>
>> Also, just opened this ticket to track work on samza-standalone:
>>
>>   https://issues.apache.org/jira/browse/SAMZA-516
>>
>> Cheers,
>> Chris
>>
>> On 1/21/15 1:32 PM, "Chris Riccomini" <cr...@linkedin.com> wrote:
>>
>> >Hey Jae,
>> >
>> >> So, we need to find out running Samza on Mesos won't create that
>> >>problem, or Spark Streaming won't have that issue. In the worst case,
>> >>creating our own distribution coordination might be more predictable
>> >>instead of running Yarn on EMR.
>> >
>> >I think that there are two ways to fix this. One is to have the Kafka
>> >broker detect that there are two producers that are "the same", and start
>> >dropping messages from the "old one" (and perhaps throw an exception to
>> >the old producer). The other way is to have the Samza container detect
>> the
>> >problem, and kill itself.
>> >
>> >The kafka-based approach is a subset of the transactionality feature
>> >described here:
>> >
>> >
>> >
>> https://cwiki.apache.org/confluence/display/KAFKA/Transactional+Messaging+
>> >i
>> >n+Kafka
>> >
>> >The problem with the Kafka approach is that 1) it's kafka-specific, and
>> 2)
>> >the generation id required to drop messages from an orphaned producer
>> >hasn't been implemented, except in a branch that's not been committed.
>> >
>> >So, if we accept that we shouldn't use Kafka as the solution for
>> detecting
>> >orphaned containers, the solution will have to go into Samza. Within
>> >Samza, there are two approaches. One is to use the resource scheduler
>> >(YARN, Mesos, etc.) to detect the problem. The other solution is to use
>> >Samza, itself, to detect the problem.
>> >
>> >A YARN-specific example of how to solve the problem would be to have the
>> >SamzaContainer periodically poll its local NM's REST endpoint:
>> >
>> >  http://eat1-app1218.corp.linkedin.com:8042/ws/v1/node/info
>> >
>> >To see what the status is, its last update time, etc. If the REST
>> endpoint
>> >can't be reached, the node is unhealthy, or the last update time is >
>> some
>> >time interval, the container could kill itself. Again, this is
>> >YARN-specific.
>> >
>> >I am not sure how Mesos handles split-brain. I've asked Tim Chen on
>> >SAMZA-375:
>> >
>> >
>> >
>> https://issues.apache.org/jira/browse/SAMZA-375?focusedCommentId=14286204&
>> >p
>>
>> >age=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comme
>> >n
>> >t-14286204
>> >
>> >The last solution that I mentioned, using Samza directly (no dependency
>> on
>> >Kafka, YARN, Mesos, etc), seems like the best long-term solution to me.
>> We
>> >can either 1) introduce a heartbeat message into the coordinator stream,
>> >or 2) use the existing checkpoint message as a heartbeat.  There is some
>> >complexity to this solution that would need to be thought through,
>> though.
>> >For example, should the heartbeat messages be sent from the main thread?
>> >What happens if the main thread is blocked on process() for an extended
>> >period of time?
>> >
>> >What do others think? As a short-term fix, it seems to me like YARN/Mesos
>> >should handle this automatically for us. Has anyone had experience with
>> >orphaned containers in Mesos?
>> >
>> >> I really appreciate if you give me some guideline about implementing
>> >>custom cluster management interface of Samza.
>> >
>> >Samza jobs are started through bin/run-job.sh (inside samza-shell). This
>> >CLI uses JobRunner to instantiate a StreamJobFactory (defined with
>> >job.factory.class), which returns a StreamJob. To implement your own
>> >cluster management, the first thing you'll need to do is implement
>> >StreamJobFactory and StreamJob. You can have a look at YarnJob or
>> >ProcessJob/ProcessJobFactory for an example of how to do this.
>> >
>> >Note that this code has changed slightly between 0.8.0 and master
>> (0.9.0).
>> >In 0.9.0, the partition-to-container assignment logic has been pulled out
>> >of YARN's AM, and into a JobCoordinator class.
>> >
>> >The trick with adding EC2 ASG is going to be in handling partition
>> >shifting when a new node is added to the group. For example, if you have
>> >two machines, each running one container, and you add a third machine,
>> >some of the input partitions (and corresponding StreamTasks) need to be
>> >shifted from the two machines on to the third. The only way to do this
>> >right now is to:
>> >
>> >1. Stop all containers.
>> >2. Re-instantiate the JobCoordinator with a new container count.
>> >3. Start new containers on all three machines with the new partition
>> >assignments.
>> >
>> >In an ideal world, steps (1-3) would be handled automatically by Samza,
>> >and wouldn't require container restarts. This is precisely what
>> >samza-standalone will accomplish. If you're interested in contributing to
>> >samza-standalone, that would be awesome. I'm working on a design doc
>> right
>> >now, which I'm trying to post by EOW. Once that's done, we can
>> collaborate
>> >on design and split the code up, if you'd like.
>> >
>> >
>> >Cheers,
>> >Chris
>> >
>> >On 1/21/15 1:14 PM, "Bae, Jae Hyeon" <me...@gmail.com> wrote:
>> >
>> >>Hi Samza Devs
>> >>
>> >>The significant concern I got recently is, container leak. The data
>> >>pipeline based on Samza can guarantee at least once delivery but the
>> >>duplicate rate is over 1.0%, I am having alerts right now. Container
>> >>leaks
>> >>will push a lot of alerts to me.
>> >>
>> >>So, we need to find out running Samza on Mesos won't create that
>> problem,
>> >>or Spark Streaming won't have that issue. In the worst case, creating
>> our
>> >>own distribution coordination might be more predictable instead of
>> >>running
>> >>Yarn on EMR.
>> >>
>> >>What about standalone Samza? If this is quite plausible and the best
>> >>solution in the near future, I want to be able to contribute. Could you
>> >>share your thoughts or plans?
>> >>
>> >>I really appreciate if you give me some guideline about implementing
>> >>custom
>> >>cluster management interface of Samza. If it's possible, I want to take
>> a
>> >>look to replace Yarn support with EC2 ASG stuff.
>> >>
>> >>Thank you
>> >>Best, Jae
>> >
>>
>>
>

Re: Question on standalone Samza distribution and concerns on container leak

Posted by "Bae, Jae Hyeon" <me...@gmail.com>.
I asked Mantis guy about orphaned container in Mesos and he was almost sure
that Mesos won't let that happen.

How is https://issues.apache.org/jira/browse/SAMZA-375 going? Depending on
the time line among stabilizing stand alone and Mesos support, our schedule
or decision will be changed.

Thank you
Best, Jae

On Wed, Jan 21, 2015 at 4:58 PM, Chris Riccomini <
criccomini@linkedin.com.invalid> wrote:

> Hey all,
>
> Also, just opened this ticket to track work on samza-standalone:
>
>   https://issues.apache.org/jira/browse/SAMZA-516
>
> Cheers,
> Chris
>
> On 1/21/15 1:32 PM, "Chris Riccomini" <cr...@linkedin.com> wrote:
>
> >Hey Jae,
> >
> >> So, we need to find out running Samza on Mesos won't create that
> >>problem, or Spark Streaming won't have that issue. In the worst case,
> >>creating our own distribution coordination might be more predictable
> >>instead of running Yarn on EMR.
> >
> >I think that there are two ways to fix this. One is to have the Kafka
> >broker detect that there are two producers that are "the same", and start
> >dropping messages from the "old one" (and perhaps throw an exception to
> >the old producer). The other way is to have the Samza container detect the
> >problem, and kill itself.
> >
> >The kafka-based approach is a subset of the transactionality feature
> >described here:
> >
> >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/Transactional+Messaging+
> >i
> >n+Kafka
> >
> >The problem with the Kafka approach is that 1) it's kafka-specific, and 2)
> >the generation id required to drop messages from an orphaned producer
> >hasn't been implemented, except in a branch that's not been committed.
> >
> >So, if we accept that we shouldn't use Kafka as the solution for detecting
> >orphaned containers, the solution will have to go into Samza. Within
> >Samza, there are two approaches. One is to use the resource scheduler
> >(YARN, Mesos, etc.) to detect the problem. The other solution is to use
> >Samza, itself, to detect the problem.
> >
> >A YARN-specific example of how to solve the problem would be to have the
> >SamzaContainer periodically poll its local NM's REST endpoint:
> >
> >  http://eat1-app1218.corp.linkedin.com:8042/ws/v1/node/info
> >
> >To see what the status is, its last update time, etc. If the REST endpoint
> >can't be reached, the node is unhealthy, or the last update time is > some
> >time interval, the container could kill itself. Again, this is
> >YARN-specific.
> >
> >I am not sure how Mesos handles split-brain. I've asked Tim Chen on
> >SAMZA-375:
> >
> >
> >
> https://issues.apache.org/jira/browse/SAMZA-375?focusedCommentId=14286204&
> >p
> >age=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comme
> >n
> >t-14286204
> >
> >The last solution that I mentioned, using Samza directly (no dependency on
> >Kafka, YARN, Mesos, etc), seems like the best long-term solution to me. We
> >can either 1) introduce a heartbeat message into the coordinator stream,
> >or 2) use the existing checkpoint message as a heartbeat.  There is some
> >complexity to this solution that would need to be thought through, though.
> >For example, should the heartbeat messages be sent from the main thread?
> >What happens if the main thread is blocked on process() for an extended
> >period of time?
> >
> >What do others think? As a short-term fix, it seems to me like YARN/Mesos
> >should handle this automatically for us. Has anyone had experience with
> >orphaned containers in Mesos?
> >
> >> I really appreciate if you give me some guideline about implementing
> >>custom cluster management interface of Samza.
> >
> >Samza jobs are started through bin/run-job.sh (inside samza-shell). This
> >CLI uses JobRunner to instantiate a StreamJobFactory (defined with
> >job.factory.class), which returns a StreamJob. To implement your own
> >cluster management, the first thing you'll need to do is implement
> >StreamJobFactory and StreamJob. You can have a look at YarnJob or
> >ProcessJob/ProcessJobFactory for an example of how to do this.
> >
> >Note that this code has changed slightly between 0.8.0 and master (0.9.0).
> >In 0.9.0, the partition-to-container assignment logic has been pulled out
> >of YARN's AM, and into a JobCoordinator class.
> >
> >The trick with adding EC2 ASG is going to be in handling partition
> >shifting when a new node is added to the group. For example, if you have
> >two machines, each running one container, and you add a third machine,
> >some of the input partitions (and corresponding StreamTasks) need to be
> >shifted from the two machines on to the third. The only way to do this
> >right now is to:
> >
> >1. Stop all containers.
> >2. Re-instantiate the JobCoordinator with a new container count.
> >3. Start new containers on all three machines with the new partition
> >assignments.
> >
> >In an ideal world, steps (1-3) would be handled automatically by Samza,
> >and wouldn't require container restarts. This is precisely what
> >samza-standalone will accomplish. If you're interested in contributing to
> >samza-standalone, that would be awesome. I'm working on a design doc right
> >now, which I'm trying to post by EOW. Once that's done, we can collaborate
> >on design and split the code up, if you'd like.
> >
> >
> >Cheers,
> >Chris
> >
> >On 1/21/15 1:14 PM, "Bae, Jae Hyeon" <me...@gmail.com> wrote:
> >
> >>Hi Samza Devs
> >>
> >>The significant concern I got recently is, container leak. The data
> >>pipeline based on Samza can guarantee at least once delivery but the
> >>duplicate rate is over 1.0%, I am having alerts right now. Container
> >>leaks
> >>will push a lot of alerts to me.
> >>
> >>So, we need to find out running Samza on Mesos won't create that problem,
> >>or Spark Streaming won't have that issue. In the worst case, creating our
> >>own distribution coordination might be more predictable instead of
> >>running
> >>Yarn on EMR.
> >>
> >>What about standalone Samza? If this is quite plausible and the best
> >>solution in the near future, I want to be able to contribute. Could you
> >>share your thoughts or plans?
> >>
> >>I really appreciate if you give me some guideline about implementing
> >>custom
> >>cluster management interface of Samza. If it's possible, I want to take a
> >>look to replace Yarn support with EC2 ASG stuff.
> >>
> >>Thank you
> >>Best, Jae
> >
>
>

Re: Question on standalone Samza distribution and concerns on container leak

Posted by Chris Riccomini <cr...@linkedin.com.INVALID>.
Hey all,

Also, just opened this ticket to track work on samza-standalone:

  https://issues.apache.org/jira/browse/SAMZA-516

Cheers,
Chris

On 1/21/15 1:32 PM, "Chris Riccomini" <cr...@linkedin.com> wrote:

>Hey Jae,
>
>> So, we need to find out running Samza on Mesos won't create that
>>problem, or Spark Streaming won't have that issue. In the worst case,
>>creating our own distribution coordination might be more predictable
>>instead of running Yarn on EMR.
>
>I think that there are two ways to fix this. One is to have the Kafka
>broker detect that there are two producers that are "the same", and start
>dropping messages from the "old one" (and perhaps throw an exception to
>the old producer). The other way is to have the Samza container detect the
>problem, and kill itself.
>
>The kafka-based approach is a subset of the transactionality feature
>described here:
>
>  
>https://cwiki.apache.org/confluence/display/KAFKA/Transactional+Messaging+
>i
>n+Kafka
>
>The problem with the Kafka approach is that 1) it's kafka-specific, and 2)
>the generation id required to drop messages from an orphaned producer
>hasn't been implemented, except in a branch that's not been committed.
>
>So, if we accept that we shouldn't use Kafka as the solution for detecting
>orphaned containers, the solution will have to go into Samza. Within
>Samza, there are two approaches. One is to use the resource scheduler
>(YARN, Mesos, etc.) to detect the problem. The other solution is to use
>Samza, itself, to detect the problem.
>
>A YARN-specific example of how to solve the problem would be to have the
>SamzaContainer periodically poll its local NM's REST endpoint:
>
>  http://eat1-app1218.corp.linkedin.com:8042/ws/v1/node/info
>
>To see what the status is, its last update time, etc. If the REST endpoint
>can't be reached, the node is unhealthy, or the last update time is > some
>time interval, the container could kill itself. Again, this is
>YARN-specific.
>
>I am not sure how Mesos handles split-brain. I've asked Tim Chen on
>SAMZA-375:
>
>  
>https://issues.apache.org/jira/browse/SAMZA-375?focusedCommentId=14286204&
>p
>age=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comme
>n
>t-14286204
>
>The last solution that I mentioned, using Samza directly (no dependency on
>Kafka, YARN, Mesos, etc), seems like the best long-term solution to me. We
>can either 1) introduce a heartbeat message into the coordinator stream,
>or 2) use the existing checkpoint message as a heartbeat.  There is some
>complexity to this solution that would need to be thought through, though.
>For example, should the heartbeat messages be sent from the main thread?
>What happens if the main thread is blocked on process() for an extended
>period of time?
>
>What do others think? As a short-term fix, it seems to me like YARN/Mesos
>should handle this automatically for us. Has anyone had experience with
>orphaned containers in Mesos?
>
>> I really appreciate if you give me some guideline about implementing
>>custom cluster management interface of Samza.
>
>Samza jobs are started through bin/run-job.sh (inside samza-shell). This
>CLI uses JobRunner to instantiate a StreamJobFactory (defined with
>job.factory.class), which returns a StreamJob. To implement your own
>cluster management, the first thing you'll need to do is implement
>StreamJobFactory and StreamJob. You can have a look at YarnJob or
>ProcessJob/ProcessJobFactory for an example of how to do this.
>
>Note that this code has changed slightly between 0.8.0 and master (0.9.0).
>In 0.9.0, the partition-to-container assignment logic has been pulled out
>of YARN's AM, and into a JobCoordinator class.
>
>The trick with adding EC2 ASG is going to be in handling partition
>shifting when a new node is added to the group. For example, if you have
>two machines, each running one container, and you add a third machine,
>some of the input partitions (and corresponding StreamTasks) need to be
>shifted from the two machines on to the third. The only way to do this
>right now is to:
>
>1. Stop all containers.
>2. Re-instantiate the JobCoordinator with a new container count.
>3. Start new containers on all three machines with the new partition
>assignments.
>
>In an ideal world, steps (1-3) would be handled automatically by Samza,
>and wouldn't require container restarts. This is precisely what
>samza-standalone will accomplish. If you're interested in contributing to
>samza-standalone, that would be awesome. I'm working on a design doc right
>now, which I'm trying to post by EOW. Once that's done, we can collaborate
>on design and split the code up, if you'd like.
>
>
>Cheers,
>Chris
>
>On 1/21/15 1:14 PM, "Bae, Jae Hyeon" <me...@gmail.com> wrote:
>
>>Hi Samza Devs
>>
>>The significant concern I got recently is, container leak. The data
>>pipeline based on Samza can guarantee at least once delivery but the
>>duplicate rate is over 1.0%, I am having alerts right now. Container
>>leaks
>>will push a lot of alerts to me.
>>
>>So, we need to find out running Samza on Mesos won't create that problem,
>>or Spark Streaming won't have that issue. In the worst case, creating our
>>own distribution coordination might be more predictable instead of
>>running
>>Yarn on EMR.
>>
>>What about standalone Samza? If this is quite plausible and the best
>>solution in the near future, I want to be able to contribute. Could you
>>share your thoughts or plans?
>>
>>I really appreciate if you give me some guideline about implementing
>>custom
>>cluster management interface of Samza. If it's possible, I want to take a
>>look to replace Yarn support with EC2 ASG stuff.
>>
>>Thank you
>>Best, Jae
>


Re: Question on standalone Samza distribution and concerns on container leak

Posted by Chris Riccomini <cr...@linkedin.com.INVALID>.
Hey Jae,

> So, we need to find out running Samza on Mesos won't create that
>problem, or Spark Streaming won't have that issue. In the worst case,
>creating our own distribution coordination might be more predictable
>instead of running Yarn on EMR.

I think that there are two ways to fix this. One is to have the Kafka
broker detect that there are two producers that are "the same", and start
dropping messages from the "old one" (and perhaps throw an exception to
the old producer). The other way is to have the Samza container detect the
problem, and kill itself.

The kafka-based approach is a subset of the transactionality feature
described here:

  
https://cwiki.apache.org/confluence/display/KAFKA/Transactional+Messaging+i
n+Kafka

The problem with the Kafka approach is that 1) it's kafka-specific, and 2)
the generation id required to drop messages from an orphaned producer
hasn't been implemented, except in a branch that's not been committed.

So, if we accept that we shouldn't use Kafka as the solution for detecting
orphaned containers, the solution will have to go into Samza. Within
Samza, there are two approaches. One is to use the resource scheduler
(YARN, Mesos, etc.) to detect the problem. The other solution is to use
Samza, itself, to detect the problem.

A YARN-specific example of how to solve the problem would be to have the
SamzaContainer periodically poll its local NM's REST endpoint:

  http://eat1-app1218.corp.linkedin.com:8042/ws/v1/node/info

To see what the status is, its last update time, etc. If the REST endpoint
can't be reached, the node is unhealthy, or the last update time is > some
time interval, the container could kill itself. Again, this is
YARN-specific.

I am not sure how Mesos handles split-brain. I've asked Tim Chen on
SAMZA-375:

  
https://issues.apache.org/jira/browse/SAMZA-375?focusedCommentId=14286204&p
age=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#commen
t-14286204

The last solution that I mentioned, using Samza directly (no dependency on
Kafka, YARN, Mesos, etc), seems like the best long-term solution to me. We
can either 1) introduce a heartbeat message into the coordinator stream,
or 2) use the existing checkpoint message as a heartbeat.  There is some
complexity to this solution that would need to be thought through, though.
For example, should the heartbeat messages be sent from the main thread?
What happens if the main thread is blocked on process() for an extended
period of time?

What do others think? As a short-term fix, it seems to me like YARN/Mesos
should handle this automatically for us. Has anyone had experience with
orphaned containers in Mesos?

> I really appreciate if you give me some guideline about implementing
>custom cluster management interface of Samza.

Samza jobs are started through bin/run-job.sh (inside samza-shell). This
CLI uses JobRunner to instantiate a StreamJobFactory (defined with
job.factory.class), which returns a StreamJob. To implement your own
cluster management, the first thing you'll need to do is implement
StreamJobFactory and StreamJob. You can have a look at YarnJob or
ProcessJob/ProcessJobFactory for an example of how to do this.

Note that this code has changed slightly between 0.8.0 and master (0.9.0).
In 0.9.0, the partition-to-container assignment logic has been pulled out
of YARN's AM, and into a JobCoordinator class.

The trick with adding EC2 ASG is going to be in handling partition
shifting when a new node is added to the group. For example, if you have
two machines, each running one container, and you add a third machine,
some of the input partitions (and corresponding StreamTasks) need to be
shifted from the two machines on to the third. The only way to do this
right now is to:

1. Stop all containers.
2. Re-instantiate the JobCoordinator with a new container count.
3. Start new containers on all three machines with the new partition
assignments.

In an ideal world, steps (1-3) would be handled automatically by Samza,
and wouldn't require container restarts. This is precisely what
samza-standalone will accomplish. If you're interested in contributing to
samza-standalone, that would be awesome. I'm working on a design doc right
now, which I'm trying to post by EOW. Once that's done, we can collaborate
on design and split the code up, if you'd like.


Cheers,
Chris

On 1/21/15 1:14 PM, "Bae, Jae Hyeon" <me...@gmail.com> wrote:

>Hi Samza Devs
>
>The significant concern I got recently is, container leak. The data
>pipeline based on Samza can guarantee at least once delivery but the
>duplicate rate is over 1.0%, I am having alerts right now. Container leaks
>will push a lot of alerts to me.
>
>So, we need to find out running Samza on Mesos won't create that problem,
>or Spark Streaming won't have that issue. In the worst case, creating our
>own distribution coordination might be more predictable instead of running
>Yarn on EMR.
>
>What about standalone Samza? If this is quite plausible and the best
>solution in the near future, I want to be able to contribute. Could you
>share your thoughts or plans?
>
>I really appreciate if you give me some guideline about implementing
>custom
>cluster management interface of Samza. If it's possible, I want to take a
>look to replace Yarn support with EC2 ASG stuff.
>
>Thank you
>Best, Jae