You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@samza.apache.org by Alexander Filipchik <af...@gmail.com> on 2016/01/08 00:59:51 UTC

Production deployement

Hi Samza community!

I'm trying to move from POC to a full speed production deployment and
trying to find answers to following questions:

1) What is the best way to handle partial outages. Let's say my yarn
cluster is deployed on amazon among 3 availability zones. Is there a way to
guarantee operability if I lose whole availability zone (one third of
capacity)? Will samza just restart failed containers on available nodes
(which means some downtime) or there is a way to have a passive task
instances that can take over? What will happen if master dies?
2) What is the best way of deploying new code? I'm especially interested in
how to deploy new tasks that maintain pretty big state without interrupting
streaming?
3) What is the good naming and versioning strategy for things like kafka
topics, RocksDB stores, etc
4) What is the best way of configuring jobs? Hello samza example bundles
configs with the tasks so all the urls are hidden inside a tar file. Is
there a better way to pass properties based on region, environment (prod,
test, dev), etc?
5) I faced a weird problem with kafka partitioning. I created 2 kafka queus
and 2 samza jobs that were communicating like:
topic1 -> samza1 -> topic2 -> samza2

samza2 had a state in rockDB (let's say itwas just storing strings it saw).
Kafka topics had 20 partitions. I found that messages that were send by
samza1 and manually using org.apache.kafka.clients.producer.KafkaProducer
were landing on different samza2 instancies even though they had same
partition key (of type string).

Example:
samza1 sending mesage with key "key1"  to samza2 via topic2, and it is
stored in task1 of samza2
I send messages manually to topic2 with key "key1" and it is stored in task
10 of samza2. Code that I was usign to send messages from samza1:

Config:

systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory
systems.kafka.samza.key.serde=string
systems.kafka.samza.msg.serde=string

Code:

private static final SystemStream OUTPUT_STREAM =
        new SystemStream("kafka", "topic2");

messageCollector.send(new OutgoingMessageEnvelope(
        OUTPUT_STREAM, "key1", message));


manually:

configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, s"$broker:9092")
configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
classOf[StringSerializer].getName)
configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
classOf[StringSerializer].getName)

val kafkaProducer = new KafkaProducer[String, String](configs)
val record = new ProducerRecord[String, String]("topic2", "key1", message)
kafkaProducer.send(record).get()

What can be wrong?

and one crazy question:
have anyone thought about combining samza and spark? Like allow spark to
use Samza's RocksDB/LevelDB storage as a state holder for micro batching?

Thank you,
Alex

Re: Production deployement

Posted by Alexander Filipchik <af...@gmail.com>.
Thank you very much for taking time and answering all my questions!

Alex

On Wed, Jan 13, 2016 at 10:18 PM, Yi Pan <ni...@gmail.com> wrote:

> Hi, Alex,
>
> I apologize for the late reply. Let me try to give some feedbacks/comments
> below:
>
> On Thu, Jan 7, 2016 at 3:59 PM, Alexander Filipchik <af...@gmail.com>
> wrote:
>
> >
> > 1) What is the best way to handle partial outages. Let's say my yarn
> > cluster is deployed on amazon among 3 availability zones. Is there a way
> to
> > guarantee operability if I lose whole availability zone (one third of
> > capacity)? Will samza just restart failed containers on available nodes
> > (which means some downtime) or there is a way to have a passive task
> > instances that can take over? What will happen if master dies?
> >
>
> When a node dies in YARN, there are the following situations:
> a. the RM dies. W/o RM HA, the whole cluster will be unavailable in this
> case and has to rely on ops to restart the whole YARN cluster
> b. one NM dies. In this case, there could be two sub-cases: b.1 the NM only
> runs SamzaContainer; b.2 the NM runs SamzaAppMaster. In b.1, SamzaAppMaster
> will re-request a new container from RM and start the SamzaContainer in the
> new container. In b.2, the whole Samza job will fail and YARN RM will
> re-start the job again. As for now, there is no "passive" task instances
> that are standby.
>
>
> > 2) What is the best way of deploying new code? I'm especially interested
> in
> > how to deploy new tasks that maintain pretty big state without
> interrupting
> > streaming?
> >
>
> Since the configuration of Samza job is still immutable, right now the way
> to deploy new Samza code is still to re-push the binary and restart the
> job. It used to take long time if your job has big states. With
> host-affinity feature in Samza 0.10, the restarted Samza job will try the
> best to use the previous hosts to run the same containers and re-use the
> local state stores. In LinkedIn, we have test this feature with big
> stateful jobs and successfully cut-down the re-bootstrap time.
>
>
>
> > 3) What is the good naming and versioning strategy for things like kafka
> > topics, RocksDB stores, etc
> >
>
> Samza does not restrict the naming mechanism application chooses for Kafka
> topics and RocksDB stores. What makes sense and can uniquely identify state
> stores and application input/output streams in the deployment environment
> would be good enough.
>
>
> > 4) What is the best way of configuring jobs? Hello samza example bundles
> > configs with the tasks so all the urls are hidden inside a tar file. Is
> > there a better way to pass properties based on region, environment (prod,
> > test, dev), etc?
> >
>
> Based on different deployment system, the way to pass on the configuration
> can be very different. The property file based configuration is just one
> simple example we used in hello-samza example. It may not make sense to
> complex deployment environment. In LinkedIn, we package the binary and
> configuration in two different packages and LinkedIn's deployment system
> can identify the bundle of binary and configuration separately and deploy
> them to the target host's specific locations. Then, the start script will
> use the specific configuration location determined by the deployment system
> as the path to configuration when starting the Samza job.
>
>
> > 5) I faced a weird problem with kafka partitioning. I created 2 kafka
> queus
> > and 2 samza jobs that were communicating like:
> > topic1 -> samza1 -> topic2 -> samza2
> >
> > samza2 had a state in rockDB (let's say itwas just storing strings it
> saw).
> > Kafka topics had 20 partitions. I found that messages that were send by
> > samza1 and manually using org.apache.kafka.clients.producer.KafkaProducer
> > were landing on different samza2 instancies even though they had same
> > partition key (of type string).
> >
> > Example:
> > samza1 sending mesage with key "key1"  to samza2 via topic2, and it is
> > stored in task1 of samza2
> > I send messages manually to topic2 with key "key1" and it is stored in
> task
> > 10 of samza2. Code that I was usign to send messages from samza1:
> >
> > Config:
> >
> >
> >
> systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory
> > systems.kafka.samza.key.serde=string
> > systems.kafka.samza.msg.serde=string
> >
> > Code:
> >
> > private static final SystemStream OUTPUT_STREAM =
> >         new SystemStream("kafka", "topic2");
> >
> > messageCollector.send(new OutgoingMessageEnvelope(
> >         OUTPUT_STREAM, "key1", message));
> >
> >
> > manually:
> >
> > configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, s"$broker:9092")
> > configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
> > classOf[StringSerializer].getName)
> > configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
> > classOf[StringSerializer].getName)
> >
> > val kafkaProducer = new KafkaProducer[String, String](configs)
> > val record = new ProducerRecord[String, String]("topic2", "key1",
> message)
> > kafkaProducer.send(record).get()
> >
> > What can be wrong?
> >
> >
> This might be related w/ SAMZA-839.
>
>
>
> > and one crazy question:
> > have anyone thought about combining samza and spark? Like allow spark to
> > use Samza's RocksDB/LevelDB storage as a state holder for micro batching?
> >
> >
> I think that a better question would be: can we implement micro-match (i.e.
> windowing) in Samza and provides RDDs to allow Spark Streaming programs to
> run on top of Samza? That's interesting thought, which allows unified
> programming model in both online and offline world. However, using
> micro-batch as in Spark Streaming APIs also introduce issues as disruptive
> session windows, out-of-order arrivals across boundaries e.t.c. We
> certainly can pound on it more.
>
>
> > Thank you,
> > Alex
> >
>

Re: Production deployement

Posted by Yi Pan <ni...@gmail.com>.
Hi, Alex,

I apologize for the late reply. Let me try to give some feedbacks/comments
below:

On Thu, Jan 7, 2016 at 3:59 PM, Alexander Filipchik <af...@gmail.com>
wrote:

>
> 1) What is the best way to handle partial outages. Let's say my yarn
> cluster is deployed on amazon among 3 availability zones. Is there a way to
> guarantee operability if I lose whole availability zone (one third of
> capacity)? Will samza just restart failed containers on available nodes
> (which means some downtime) or there is a way to have a passive task
> instances that can take over? What will happen if master dies?
>

When a node dies in YARN, there are the following situations:
a. the RM dies. W/o RM HA, the whole cluster will be unavailable in this
case and has to rely on ops to restart the whole YARN cluster
b. one NM dies. In this case, there could be two sub-cases: b.1 the NM only
runs SamzaContainer; b.2 the NM runs SamzaAppMaster. In b.1, SamzaAppMaster
will re-request a new container from RM and start the SamzaContainer in the
new container. In b.2, the whole Samza job will fail and YARN RM will
re-start the job again. As for now, there is no "passive" task instances
that are standby.


> 2) What is the best way of deploying new code? I'm especially interested in
> how to deploy new tasks that maintain pretty big state without interrupting
> streaming?
>

Since the configuration of Samza job is still immutable, right now the way
to deploy new Samza code is still to re-push the binary and restart the
job. It used to take long time if your job has big states. With
host-affinity feature in Samza 0.10, the restarted Samza job will try the
best to use the previous hosts to run the same containers and re-use the
local state stores. In LinkedIn, we have test this feature with big
stateful jobs and successfully cut-down the re-bootstrap time.



> 3) What is the good naming and versioning strategy for things like kafka
> topics, RocksDB stores, etc
>

Samza does not restrict the naming mechanism application chooses for Kafka
topics and RocksDB stores. What makes sense and can uniquely identify state
stores and application input/output streams in the deployment environment
would be good enough.


> 4) What is the best way of configuring jobs? Hello samza example bundles
> configs with the tasks so all the urls are hidden inside a tar file. Is
> there a better way to pass properties based on region, environment (prod,
> test, dev), etc?
>

Based on different deployment system, the way to pass on the configuration
can be very different. The property file based configuration is just one
simple example we used in hello-samza example. It may not make sense to
complex deployment environment. In LinkedIn, we package the binary and
configuration in two different packages and LinkedIn's deployment system
can identify the bundle of binary and configuration separately and deploy
them to the target host's specific locations. Then, the start script will
use the specific configuration location determined by the deployment system
as the path to configuration when starting the Samza job.


> 5) I faced a weird problem with kafka partitioning. I created 2 kafka queus
> and 2 samza jobs that were communicating like:
> topic1 -> samza1 -> topic2 -> samza2
>
> samza2 had a state in rockDB (let's say itwas just storing strings it saw).
> Kafka topics had 20 partitions. I found that messages that were send by
> samza1 and manually using org.apache.kafka.clients.producer.KafkaProducer
> were landing on different samza2 instancies even though they had same
> partition key (of type string).
>
> Example:
> samza1 sending mesage with key "key1"  to samza2 via topic2, and it is
> stored in task1 of samza2
> I send messages manually to topic2 with key "key1" and it is stored in task
> 10 of samza2. Code that I was usign to send messages from samza1:
>
> Config:
>
>
> systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory
> systems.kafka.samza.key.serde=string
> systems.kafka.samza.msg.serde=string
>
> Code:
>
> private static final SystemStream OUTPUT_STREAM =
>         new SystemStream("kafka", "topic2");
>
> messageCollector.send(new OutgoingMessageEnvelope(
>         OUTPUT_STREAM, "key1", message));
>
>
> manually:
>
> configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, s"$broker:9092")
> configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
> classOf[StringSerializer].getName)
> configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
> classOf[StringSerializer].getName)
>
> val kafkaProducer = new KafkaProducer[String, String](configs)
> val record = new ProducerRecord[String, String]("topic2", "key1", message)
> kafkaProducer.send(record).get()
>
> What can be wrong?
>
>
This might be related w/ SAMZA-839.



> and one crazy question:
> have anyone thought about combining samza and spark? Like allow spark to
> use Samza's RocksDB/LevelDB storage as a state holder for micro batching?
>
>
I think that a better question would be: can we implement micro-match (i.e.
windowing) in Samza and provides RDDs to allow Spark Streaming programs to
run on top of Samza? That's interesting thought, which allows unified
programming model in both online and offline world. However, using
micro-batch as in Spark Streaming APIs also introduce issues as disruptive
session windows, out-of-order arrivals across boundaries e.t.c. We
certainly can pound on it more.


> Thank you,
> Alex
>