You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@storm.apache.org by "Rick Kellogg (JIRA)" <ji...@apache.org> on 2015/10/09 03:21:27 UTC

[jira] [Updated] (STORM-153) Scaling storm beyond 1.2k workers

     [ https://issues.apache.org/jira/browse/STORM-153?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Rick Kellogg updated STORM-153:
-------------------------------
    Component/s: storm-core

> Scaling storm beyond 1.2k workers
> ---------------------------------
>
>                 Key: STORM-153
>                 URL: https://issues.apache.org/jira/browse/STORM-153
>             Project: Apache Storm
>          Issue Type: Improvement
>          Components: storm-core
>            Reporter: James Xu
>
> https://github.com/nathanmarz/storm/issues/620
> Our storm & zookeeper cluster has reached a scaling limit, on the
> zookeeper we retain storm state, kafka commit offsets and trident
> transaction state. Our hardware is no joke (RAIDed 6 disks with 512mb
> cache). The reason I say it's reached scaling limit is because a few
> users have reported trying to deploy a topology, only to see the
> workers continually die and restart with executor not alive in Nimbus
> logs. Additionally, recently when a user deployed a topology with 140
> workers, not only did his topology enter into continuous workers death
> & restart, but at the same moment, half of production also went into
> this state and never recovered until I manually deactivated a bunch of
> production. So this person's topology caused previously stable workers
> to get executor not alive exceptions.
> We have a dedicated zookeeper SRE who says zkget, and zk list
> children, and the heartbeats of zookeeper ephemeral nodes are
> relatively very cheap operations, but zk create and set are expensive
> since they require quorum, and he's never seen a zookeeper cluster
> with so many quorum operations at any company he's worked at in the
> past. The rate of quorum ops tends to be at around 1.5k/s sustained
> rate. If you do the math, starting with the fact we have 1.2k workers,
> and that our workers are configured for 1 heartbeat a second, that
> implies 1.2k/s zksets per second which is 80% of the total number of
> quorum operations we're measuring via zookeeper metrics. Comparing our
> empirical throughput to this benchmark
> http://zookeeper.apache.org/doc/r3.1.2/zookeeperOver.html our real
> world results are not quite as good as what's claimed here, but in the
> same order of magnitude, storm also uses much larger zknode payload
> approaching 1mb in some cases. We need a way to scale to AT LEAST 10k
> workers, preferably 100k workers. So what are the solutions here?
> SOLUTION #1
> Ephemeral nodes are an obvious solution here. There's a few hidden
> complexities:
> #1 We're not actually reducing the peak quorum operation rate, because
>  creating an ephemeral node requires a quorum operation. Clearly it's
>  less load overall though, because you don't have sustained zksets
>  every second. However if hitting a peak quorum operation of 1.5k/s
>  were to cause zk sessions to die such as what we've observed on the
>  current cluster, then ephemeral don't solve this problem, because
>  they still require quorum to create. I think the only way to
>  determine if this will occur is to build it and extensive testing.
> I'm not sure what netflix curator does here, but IF the ephemeral
> nodes are immediately recreated when the session is re-established
> then you easily enter into cascading failure because that means higher
> reconnect rates are positively correlated with higher quorum ops, so
> this easily could enter into tailspin. A simple solution here is to
> NOT recreate the ephemeral nodes during a reconnect, and instead wait
> for the heartbeat to occur.
> #2 Ephemeral nodes are tied to zk session, so if you kill -9 a worker,
>  from my understanding there will be some latency in when the
>  ephemeral node is actually cleaned up. If the worker is restarted,
>  and tries to recreate the node during start-up the node will already
>  exist, so it should delete existing nodes on start-up. Re:
>  http://developers.blog.box.com/2012/04/10/a-gotcha-when-using-zookeeper-ephemeral-nodes/
>  (URL courtesy of Siddharth)
> #3 During a leader elections, I see all the zk sessions dying, and
>  curator code entering into a reconnecting phase. (This is true for
>  zookeeper 3.4 and 3.3, maybe not 3.5). Since ephemeral nodes have a
>  lifetime tied to zk sessions, all the ephemeral nodes will dissapear
>  during zk election. We don't want the whole storm cluster to die
>  during zk elections, so the nimbus algorithm must do more than just
>  kill workers when the ephemeral nodes dissapear. Instead you want a
>  different algorithm, something that waits for ephemeral nodes to have
>  dissapeared for a certain length of time.
> #5 Strictly speaking this solution is not horizontally scalable, it
>  scales with the number of nodes in the zookeeper cluster, but the
>  quorum op rate decreases when you scale up a zookeeper cluster. Not
>  sure what the upper bound is here and if this actually gets us to
>  100k or just 10k.
> #6 What code does the ping to keep the zookeeper session alive? Is
>  that zookeeper client code? because it should be ran in a high
>  priority thread. I've seen storm workers die from executor not alive
>  just because they have CPU load spikes and the heartbeat doesn't get
>  in, once I raised the priority of this thread it solved the
>  problem. So the ping thread should do the same.
> Anyone have experience with using ephemeral nodes as a health status
> metric, not merely for service discovery?
> Overall all these complexities seem manageable.
> SOLUTION #2
> You actually don't need state persistence, or quorum to do heartbeats,
> which is what zookeper is adding in causing complexity. Heartbeats
> could just be sent to a series of heartbeat-daemons that have no
> persistent storage using random distribution. Nimbus would then query
> the heartbeat-daemons to understand the state of the cluster. This is
> horizontally scalable, could scale to 100k. We would use zookeeper to
> do discovery of the heartbeat-daemons. This would substantially reduce
> the load on zookeeper by several orders of magnitude.
> This is similar to how mesos does heartbeats. Mesos actually has the
> 'master' receive heartbeats from all the slaves. They've scaled this
> to a couple thousand slaves, and are looking at partioning the
> heartbeats across the non-elected masters as well as the master to
> scale this up further. So that's another idea, to use all the nimbus
> masters to handle this. Which is nice because you don't have to add
> yet another daemon to the storm ecosystem.
> SOLUTION #3
> You can also decrease the heartbeat frequency from 1 second, to 10
> seconds. Presumbly this will let you scale 10x then. Any draw backs to
> this? I know the nimbus UI stats will update less frequently, it'll
> take slightly longer to find dead workers, is that it? It doesn't get
> us to 100x scale, which potentially the first 2 solutions could.
> ---------- rgs1: wrt to:
> " Ephemeral nodes are tied to zk session, so if you kill -9 a worker,
> from my understanding there will be some latency in when the ephemeral
> node is actually cleaned up. If the worker is restarted, and tries to
> recreate the node during start-up the node will already exist, so it
> should delete existing nodes on start-up."
> this is trivially cheap on most cases, where the session will be gone
> for good, since you would just call exists() before the delete() which
> is not a quorom operation (but served from memory on whatever host you
> are connected to).
> ---------- d2r: +1
> storm also uses much larger zknode payload approaching 1mb in some
> cases.  #604 : I think this might happen depending on the number of
> workers & parallelism of the topology itself, not the number of
> workers in the cluster.
> ---------- revans2: We ran into similar scaling issues too. We have
> not done anything permanent yet, except reconfigure the nodes that ZK
> is running on to be able to handle more IOPS, which seems to be the
> primary limiting factor for us. Although we are not at the same scale
> that you are yet so the number of network connections to ZK may start
> to be more of an issue soon.
> I did some research into the size of the data sent to ZK by our
> instance of storm. I found that the data is rather compressible and
> with a simple GzipOutputStream I could reduce the size to 1/5th the
> original at the cost of some CPU. The 1MB limit that ZK has is also
> configurable -Djute.maxbuffer=<NUM_BYTES>, although it has to be
> configured for all the ZK nodes, the supervisors and all the workers
> too because it is a limit when reading jute encoded data (what ZK uses
> for serialization).
> Solution 3 is how Hadoop, Mesos, and many other large systems
> work. But it requires a new solution to persist state and will need a
> lot of work to be resistant to multiple failures, which are things
> that ZK gives you for free. Most of these large systems that heartbeat
> to a central server either have no state recovery on failure, or if
> they do it was added in very recently. Because the zk layer is
> abstracted away from the code that reads/persists the state we could
> make it pluggable and play around with other key/value stores. We
> could still leave ZK for things that need it like leader election.
> Another thing we thought about doing was creating a RAM disk and
> configuring ZK to write edit logs to the RAM disk, along with very
> aggressive rolling and cleanup of those logs. This would only work if
> IOPS is really the bottleneck for you and not something else in ZK
> like lock contention or CPU.
> ---------- rgs1: One detail of implementing heartbeats by ephemeral
> znodes (so the actual heart-beating is delegate to ZooKeeper pings
> which are local to each cluster node - not a quorum operation - hence
> very cheap) is how to deal with the transient nodes of ephemeral
> znodes. Because of network hiccups (and clients issues) ephemeral
> nodes can come and go. So this means workers could be killed/started
> when flaps happen. We could mitigate this (pretty much entirely) by
> not reacting immediately to ephemeral nodes appearing/disappearing but
> keeping a local tab of the available nodes and only remove them when
> they've been gone for a while (and potentially only consider them
> alive when they've been around for a while). This shouldn't be
> expensive to implement and in terms of Zk it means polling via
> continous getChildren() calls which are cheap and local (to a cluster
> node).
> ---------- d2r: Regarding the discussion on storm-user, we'd like to
> secure heartbeats such that workers of one topology cannot access
> those of another topology. So what follows is the requirements we have
> and how we're approaching it. Maybe this could be valuable to keep in
> mind as we're discussing the new design, so that the design will not
> make it unnecessarily difficult to add confidentiality guarantees to
> heartbeats later on.
> I think it is sufficient to guarantee the following:
> storm cluster can do anything it wants topology workers can only read
> heartbeats of workers in that same topology topology workers can only
> write to or delete heartbeats of workers in that same topology.  We're
> trying out ZooKeeper ACLs with MD5-digest authentication in the
> near-term. The only tricky thing really is getting around the fact
> that child znodes do not inherit from parents, and so they must be
> explicitly set on each znode.
> This complication arises in two cases with the current implementation:
> when nimbus cleans up worker heartbeats in ZK and must be authorized
> to remove those nodes and any children the worker may no longer clean
> up its own heartbeats, as allowing DELETE on the parent node (created
> by nimbus) would also allow any other worker to delete the workers
> heartbeats We have the worker set an ACL with two entries, one to
> allow the worker itself full access, and one that uses the digest of
> the cluster's credentials to allow full access to the cluster's user.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)