You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@storm.apache.org by danny0405 <gi...@git.apache.org> on 2017/09/11 09:08:45 UTC

[GitHub] storm pull request #2319: [STORM-2693] Nimbus assignments promotion

GitHub user danny0405 opened a pull request:

    https://github.com/apache/storm/pull/2319

    [STORM-2693] Nimbus assignments promotion

    Storm now doesn't support large cluster[ for example thousand of supervisors] very well, for our production, topology submission/killing is very ineffective when cluster grows to be large, i checkout the assignments strategy now and find that actually it can be promoted.
    
    For assignment promotion:
    1. nimbus will put the assignments in local disk
    2. when restart or HA leader trigger nimbus will recover assignments from zk to local disk
    3. nimbus will tell supervisor its assignment every time through RPC every scheduling round [ only the assignments changed nodes will be notified ]
    4. expect that the nimbus notification, supervisor will sync assignments at fixed time[ rpc request to nimbus ]
    5. workers will sync assignments just from local supervisor [ or from zookeeper when local supervisor collapse]
    
    <img width="603" alt="2fa30cd8-af15-4352-992d-a67bd724e7fb" src="https://user-images.githubusercontent.com/7644508/30267044-d0758492-9713-11e7-87cc-09af890aced9.png">
    
    I have tested it in our cluster, with the new assignments distribution mode of RPC, supervisor will response to the assignments change very fast[ milliseconds ] and efficiently [ only assignments changed nodes will be notified ], also it has the full robustness of the old zookeeper mode:
    1. when nimbus collapse, workers works fine[ like the original ], when leader starts up, it will sync the assignments and start to work again
    2. when supervisor goes down, workers still workers fine,[ it will sync connections from zk like the original ], when supervisor goes up, it will just sync the assignments from nimbus
    3. when zk is unstable, it will not affect the assignments sync, but only the heartbeats and logical plan[ StormBase or something ]
    
    This is my JIRA task: https://issues.apache.org/jira/browse/STORM-2693
    
    @HeartSaVioR can you please help me to review this?

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/danny0405/storm schedule-promotion

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/storm/pull/2319.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #2319
    
----
commit 9e7cde8c48e33811615ab4d36e1f5dad94e8499c
Author: chenyuzhao <ch...@meituan.com>
Date:   2017-09-11T03:46:23Z

    add local assignment backend and assign assignemnts through RPC

----


---

[GitHub] storm pull request #2319: [STORM-2693] Nimbus assignments promotion

Posted by danny0405 <gi...@git.apache.org>.
Github user danny0405 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2319#discussion_r138092651
  
    --- Diff: storm-core/pom.xml ---
    @@ -331,6 +331,10 @@
                 <artifactId>commons-codec</artifactId>
             </dependency>
             <dependency>
    +            <groupId>org.rocksdb</groupId>
    +            <artifactId>rocksdbjni</artifactId>
    +        </dependency>
    --- End diff --
    
    okey, this is an issue, i choose rocksdb for its good performance, but the jar conflicts really need a concern.


---

[GitHub] storm pull request #2319: [STORM-2693] Nimbus assignments promotion

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2319#discussion_r138073996
  
    --- Diff: storm-core/src/clj/org/apache/storm/daemon/supervisor.clj ---
    @@ -590,6 +583,7 @@
                                     (set (keys new-assignment)))]
             (.killedWorker isupervisor (int p)))
           (.assigned isupervisor (keys new-assignment))
    +      ;;TODO: remove one copy of assignmnets.
    --- End diff --
    
    All of the TODO comments need to be removed.  If you want to do them, then either do them or file a follow on JIRA to do them.


---

[GitHub] storm pull request #2319: [STORM-2693] Nimbus assignments promotion

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2319#discussion_r138073574
  
    --- Diff: storm-core/src/clj/org/apache/storm/daemon/supervisor.clj ---
    @@ -62,26 +63,17 @@
     
     (defn- assignments-snapshot [storm-cluster-state callback assignment-versions]
       (let [storm-ids (.assignments storm-cluster-state callback)]
    -    (let [new-assignments
    -          (->>
    -           (dofor [sid storm-ids]
    -                  (let [recorded-version (:version (get assignment-versions sid))]
    -                    (if-let [assignment-version (.assignment-version storm-cluster-state sid callback)]
    -                      (if (= assignment-version recorded-version)
    -                        {sid (get assignment-versions sid)}
    -                        {sid (.assignment-info-with-version storm-cluster-state sid callback)})
    -                      {sid nil})))
    -           (apply merge)
    -           (filter-val not-nil?))
    +    (let [new-assignments (.assignments-info storm-cluster-state)
               new-profiler-actions
               (->>
                 (dofor [sid (distinct storm-ids)]
                        (if-let [topo-profile-actions (.get-topology-profile-requests storm-cluster-state sid false)]
                           {sid topo-profile-actions}))
                (apply merge))]
              
    -      {:assignments (into {} (for [[k v] new-assignments] [k (:data v)]))
    +      {:assignments new-assignments
            :profiler-actions new-profiler-actions
    +       ;; TODO: remove versions
    --- End diff --
    
    Are you going to remove the TODO or not?


---

[GitHub] storm pull request #2319: [STORM-2693] Nimbus assignments promotion

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2319#discussion_r138663085
  
    --- Diff: storm-core/src/jvm/org/apache/storm/assignments/AssignmentDistributionService.java ---
    @@ -0,0 +1,232 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + * <p/>
    + * http://www.apache.org/licenses/LICENSE-2.0
    + * <p/>
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.storm.assignments;
    +
    +import org.apache.storm.Config;
    +import org.apache.storm.generated.SupervisorAssignments;
    +import org.apache.storm.utils.SupervisorClient;
    +import org.apache.storm.utils.Utils;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.io.Closeable;
    +import java.io.IOException;
    +import java.util.HashMap;
    +import java.util.Map;
    +import java.util.Random;
    +import java.util.concurrent.ExecutorService;
    +import java.util.concurrent.Executors;
    +import java.util.concurrent.LinkedBlockingQueue;
    +import java.util.concurrent.TimeUnit;
    +
    +/**
    + * <p>A service for distributing master assignments to supervisors, this service makes the assignments notification asynchronous.
    + * <p>We support multiple working threads to distribute assignment, every thread has a queue buffer.
    + * <p>Master will shuffle its node request to the queues, if the target queue is full, we just discard the request, let the supervisors sync instead.
    + * <p/>
    + * <pre>{@code
    + * Working mode
    + *                      +--------+         +-----------------+
    + *                      | queue1 |   ==>   | Working thread1 |
    + * +--------+ shuffle   +--------+         +-----------------+
    + * | Master |   ==>
    + * +--------+           +--------+         +-----------------+
    + *                      | queue2 |   ==>   | Working thread2 |
    + *                      +--------+         +-----------------+
    + * }
    + * </pre>
    + */
    +public class AssignmentDistributionService implements Closeable {
    +    private static final Logger LOG = LoggerFactory.getLogger(AssignmentDistributionService.class);
    +    private ExecutorService service;
    +
    +    /**
    +     * Flag to indicate if the service is active
    +     */
    +    private volatile boolean active = false;
    +
    +    private Random random;
    +    /**
    +     * Working threads num.
    +     */
    +    private int threadsNum = 0;
    +    /**
    +     * Working thread queue size.
    +     */
    +    private int queueSize = 0;
    +
    +    /**
    +     * Assignments request queue.
    +     */
    +    private volatile Map<Integer, LinkedBlockingQueue<NodeAssignments>> assignmentsQueue;
    +
    +    private Map conf;
    +
    +    /**
    +     * Function for initialization.
    +     *
    +     * @param conf
    +     */
    +    public void prepare(Map conf) {
    +        this.conf = conf;
    +        this.random = new Random(47);
    +
    +        this.threadsNum = Utils.getInt(conf.get(Config.NIMBUS_ASSIGNMENTS_SERVICE_THREADS), 10);
    +        this.queueSize = Utils.getInt(conf.get(Config.NIMBUS_ASSIGNMENTS_SERVICE_THREAD_QUEUE_SIZE), 100);
    +
    +        this.assignmentsQueue = new HashMap<>();
    +        for (int i = 0; i < threadsNum; i++) {
    +            this.assignmentsQueue.put(i, new LinkedBlockingQueue<NodeAssignments>(queueSize));
    +        }
    +        //start the thread pool
    +        this.service = Executors.newFixedThreadPool(threadsNum);
    +        this.active = true;
    +        //start the threads
    +        for (int i = 0; i < threadsNum; i++) {
    +            this.service.submit(new DistributeTask(this, i));
    +        }
    +    }
    +
    +    @Override
    +    public void close() throws IOException {
    +        this.active = false;
    +        this.service.shutdownNow();
    +        try {
    +            this.service.awaitTermination(10l, TimeUnit.SECONDS);
    +        } catch (InterruptedException e) {
    +            LOG.error("Failed to close assignments distribute service");
    +        }
    +        this.assignmentsQueue = null;
    +    }
    +
    +    public void addAssignmentsForNode(String node, SupervisorAssignments assignments) {
    +        try {
    +            boolean success = nextQueue().offer(NodeAssignments.getInstance(node, assignments), 5l, TimeUnit.SECONDS);
    +            if (!success) {
    +                LOG.warn("Discard an assignment distribution for node {} because the target sub queue is full", node);
    +            }
    +
    +        } catch (InterruptedException e) {
    +            LOG.error("Add node assignments interrupted: {}", e.getMessage());
    +            throw new RuntimeException(e);
    +        }
    +    }
    +
    +    static class NodeAssignments {
    +        private String node;
    +        private SupervisorAssignments assignments;
    +
    +        private NodeAssignments(String node, SupervisorAssignments assignments) {
    +            this.node = node;
    +            this.assignments = assignments;
    +        }
    +
    +        public static NodeAssignments getInstance(String node, SupervisorAssignments assignments) {
    +            return new NodeAssignments(node, assignments);
    +        }
    +
    +        public String getNode() {
    +            return this.node;
    +        }
    +
    +        public SupervisorAssignments getAssignments() {
    +            return this.assignments;
    +        }
    +
    +    }
    +
    +    /**
    +     * Task to distribute assignments.
    +     */
    +    static class DistributeTask implements Runnable {
    +        private AssignmentDistributionService service;
    +        private Integer queueIndex;
    +
    +        DistributeTask(AssignmentDistributionService service, Integer index) {
    +            this.service = service;
    +            this.queueIndex = index;
    +        }
    +
    +        @Override
    +        public void run() {
    +            while (true) {
    +                try {
    +                    NodeAssignments nodeAssignments = this.service.nextAssignments(queueIndex);
    +                    sendAssignmentsToNode(nodeAssignments);
    +                } catch (InterruptedException e) {
    +                    if (service.isActive()) {
    +                        LOG.error("Get an unexpected interrupt when distributing assignments to node, {}", e.getCause());
    +                    } else {
    +                        // service is off now just interrupt it.
    +                        Thread.currentThread().interrupt();
    +                    }
    +                }
    +            }
    +        }
    +
    +        private void sendAssignmentsToNode(NodeAssignments assignments) {
    +            SupervisorClient client = SupervisorClient.getConfiguredClient(service.getConf(), assignments.getNode());
    --- End diff --
    
    Can we do something where if the send is too old that we don't do it?
    
    Because we have 2 ways to get this information to the client I can see us having race conditions where a supervisor sees State A followed by State B and then back again to State A.  I would prefer to avoid something like that, and timeout the send if it is more than 10 seconds old, because the supervisor should be checking every 10 seconds anyways.


---

[GitHub] storm pull request #2319: [STORM-2693] Nimbus assignments promotion

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2319#discussion_r138093197
  
    --- Diff: storm-core/pom.xml ---
    @@ -331,6 +331,10 @@
                 <artifactId>commons-codec</artifactId>
             </dependency>
             <dependency>
    +            <groupId>org.rocksdb</groupId>
    +            <artifactId>rocksdbjni</artifactId>
    +        </dependency>
    --- End diff --
    
    If you move the rocksdb implementation to a separate jar that people can add in on their own and set the default implementation to be in memory, then it should not be a big deal.


---

[GitHub] storm pull request #2319: [STORM-2693] Nimbus assignments promotion

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2319#discussion_r138099459
  
    --- Diff: storm-core/src/clj/org/apache/storm/daemon/nimbus.clj ---
    @@ -961,14 +1015,25 @@
         ;; tasks figure out what tasks to talk to by looking at topology at runtime
         ;; only log/set when there's been a change to the assignment
         (doseq [[topology-id assignment] new-assignments
    -            :let [existing-assignment (get existing-assignments topology-id)
    -                  topology-details (.getById topologies topology-id)]]
    +            :let [existing-assignment (get existing-assignments topology-id)]]
           (if (= existing-assignment assignment)
             (log-debug "Assignment for " topology-id " hasn't changed")
             (do
               (log-message "Setting new assignment for topology id " topology-id ": " (pr-str assignment))
               (.set-assignment! storm-cluster-state topology-id assignment)
               )))
    +
    +    ;; grouping assignment by node to see the nodes diff, then notify nodes/supervisors to synchronize its owned assignment
    +    ;; because the number of existing assignments is small for every scheduling round,
    +    ;; we expect to notify supervisors at almost the same time.
    +    (->> new-assignments
    +         (map (fn [[tid new-assignment]]
    +           (let [existing-assignment (get existing-assignments tid)]
    +             (assignment-changed-nodes existing-assignment new-assignment ))))
    +         (apply concat)
    +         (into #{})
    +         (notify-supervisors-assignments conf new-assignments))
    --- End diff --
    
    So then if we can put in a thread pool with a timeout of 5 seconds or so for each push attempt we should be able to get the best of both worlds. 


---

[GitHub] storm pull request #2319: [STORM-2693] Nimbus assignments promotion

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2319#discussion_r138657756
  
    --- Diff: storm-core/src/clj/org/apache/storm/cluster.clj ---
    @@ -123,6 +133,7 @@
     (def SUPERVISORS-SUBTREE (str "/" SUPERVISORS-ROOT))
     (def WORKERBEATS-SUBTREE (str "/" WORKERBEATS-ROOT))
     (def BACKPRESSURE-SUBTREE (str "/" BACKPRESSURE-ROOT))
    +(def LEADERINFO-SUBTREE (str "/" LEADERINFO-ROOT))
    --- End diff --
    
    Is this needed?  we already have a way of getting the leader that is built into the client.  Why do we need a second way?


---

[GitHub] storm pull request #2319: [STORM-2693] Nimbus assignments promotion

Posted by danny0405 <gi...@git.apache.org>.
Github user danny0405 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2319#discussion_r138103588
  
    --- Diff: storm-core/src/clj/org/apache/storm/daemon/nimbus.clj ---
    @@ -961,14 +1015,25 @@
         ;; tasks figure out what tasks to talk to by looking at topology at runtime
         ;; only log/set when there's been a change to the assignment
         (doseq [[topology-id assignment] new-assignments
    -            :let [existing-assignment (get existing-assignments topology-id)
    -                  topology-details (.getById topologies topology-id)]]
    +            :let [existing-assignment (get existing-assignments topology-id)]]
           (if (= existing-assignment assignment)
             (log-debug "Assignment for " topology-id " hasn't changed")
             (do
               (log-message "Setting new assignment for topology id " topology-id ": " (pr-str assignment))
               (.set-assignment! storm-cluster-state topology-id assignment)
               )))
    +
    +    ;; grouping assignment by node to see the nodes diff, then notify nodes/supervisors to synchronize its owned assignment
    +    ;; because the number of existing assignments is small for every scheduling round,
    +    ;; we expect to notify supervisors at almost the same time.
    +    (->> new-assignments
    +         (map (fn [[tid new-assignment]]
    +           (let [existing-assignment (get existing-assignments tid)]
    +             (assignment-changed-nodes existing-assignment new-assignment ))))
    +         (apply concat)
    +         (into #{})
    +         (notify-supervisors-assignments conf new-assignments))
    --- End diff --
    
    Sounds really good, i will try to make it better


---

[GitHub] storm issue #2319: [STORM-2693] Nimbus assignments promotion

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on the issue:

    https://github.com/apache/storm/pull/2319
  
    @danny0405 
    To be clear, all the pull requests should be merged into master first, and merged back to 1.x version line. So to integrate your great effort into Apache Storm, we need PR against master branch. Actually transition to Java makes me easier to review given that I'm still not familiar with Clojure.


---

[GitHub] storm pull request #2319: [STORM-2693] Nimbus assignments promotion

Posted by danny0405 <gi...@git.apache.org>.
Github user danny0405 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2319#discussion_r138090333
  
    --- Diff: storm-core/src/clj/org/apache/storm/daemon/supervisor.clj ---
    @@ -62,26 +63,17 @@
     
     (defn- assignments-snapshot [storm-cluster-state callback assignment-versions]
       (let [storm-ids (.assignments storm-cluster-state callback)]
    -    (let [new-assignments
    -          (->>
    -           (dofor [sid storm-ids]
    -                  (let [recorded-version (:version (get assignment-versions sid))]
    -                    (if-let [assignment-version (.assignment-version storm-cluster-state sid callback)]
    -                      (if (= assignment-version recorded-version)
    -                        {sid (get assignment-versions sid)}
    -                        {sid (.assignment-info-with-version storm-cluster-state sid callback)})
    -                      {sid nil})))
    -           (apply merge)
    -           (filter-val not-nil?))
    +    (let [new-assignments (.assignments-info storm-cluster-state)
               new-profiler-actions
               (->>
                 (dofor [sid (distinct storm-ids)]
                        (if-let [topo-profile-actions (.get-topology-profile-requests storm-cluster-state sid false)]
                           {sid topo-profile-actions}))
                (apply merge))]
              
    -      {:assignments (into {} (for [[k v] new-assignments] [k (:data v)]))
    +      {:assignments new-assignments
            :profiler-actions new-profiler-actions
    +       ;; TODO: remove versions
    --- End diff --
    
    yes, i'm going to remove the TODO


---

[GitHub] storm pull request #2319: [STORM-2693] Nimbus assignments promotion

Posted by danny0405 <gi...@git.apache.org>.
Github user danny0405 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2319#discussion_r138791919
  
    --- Diff: storm-core/src/jvm/org/apache/storm/assignments/AssignmentDistributionService.java ---
    @@ -0,0 +1,232 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + * <p/>
    + * http://www.apache.org/licenses/LICENSE-2.0
    + * <p/>
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.storm.assignments;
    +
    +import org.apache.storm.Config;
    +import org.apache.storm.generated.SupervisorAssignments;
    +import org.apache.storm.utils.SupervisorClient;
    +import org.apache.storm.utils.Utils;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.io.Closeable;
    +import java.io.IOException;
    +import java.util.HashMap;
    +import java.util.Map;
    +import java.util.Random;
    +import java.util.concurrent.ExecutorService;
    +import java.util.concurrent.Executors;
    +import java.util.concurrent.LinkedBlockingQueue;
    +import java.util.concurrent.TimeUnit;
    +
    +/**
    + * <p>A service for distributing master assignments to supervisors, this service makes the assignments notification asynchronous.
    + * <p>We support multiple working threads to distribute assignment, every thread has a queue buffer.
    + * <p>Master will shuffle its node request to the queues, if the target queue is full, we just discard the request, let the supervisors sync instead.
    + * <p/>
    + * <pre>{@code
    + * Working mode
    + *                      +--------+         +-----------------+
    + *                      | queue1 |   ==>   | Working thread1 |
    + * +--------+ shuffle   +--------+         +-----------------+
    + * | Master |   ==>
    + * +--------+           +--------+         +-----------------+
    + *                      | queue2 |   ==>   | Working thread2 |
    + *                      +--------+         +-----------------+
    + * }
    + * </pre>
    + */
    +public class AssignmentDistributionService implements Closeable {
    +    private static final Logger LOG = LoggerFactory.getLogger(AssignmentDistributionService.class);
    +    private ExecutorService service;
    +
    +    /**
    +     * Flag to indicate if the service is active
    +     */
    +    private volatile boolean active = false;
    +
    +    private Random random;
    +    /**
    +     * Working threads num.
    +     */
    +    private int threadsNum = 0;
    +    /**
    +     * Working thread queue size.
    +     */
    +    private int queueSize = 0;
    +
    +    /**
    +     * Assignments request queue.
    +     */
    +    private volatile Map<Integer, LinkedBlockingQueue<NodeAssignments>> assignmentsQueue;
    +
    +    private Map conf;
    +
    +    /**
    +     * Function for initialization.
    +     *
    +     * @param conf
    +     */
    +    public void prepare(Map conf) {
    +        this.conf = conf;
    +        this.random = new Random(47);
    +
    +        this.threadsNum = Utils.getInt(conf.get(Config.NIMBUS_ASSIGNMENTS_SERVICE_THREADS), 10);
    +        this.queueSize = Utils.getInt(conf.get(Config.NIMBUS_ASSIGNMENTS_SERVICE_THREAD_QUEUE_SIZE), 100);
    +
    +        this.assignmentsQueue = new HashMap<>();
    +        for (int i = 0; i < threadsNum; i++) {
    +            this.assignmentsQueue.put(i, new LinkedBlockingQueue<NodeAssignments>(queueSize));
    +        }
    +        //start the thread pool
    +        this.service = Executors.newFixedThreadPool(threadsNum);
    +        this.active = true;
    +        //start the threads
    +        for (int i = 0; i < threadsNum; i++) {
    +            this.service.submit(new DistributeTask(this, i));
    +        }
    +    }
    +
    +    @Override
    +    public void close() throws IOException {
    +        this.active = false;
    +        this.service.shutdownNow();
    +        try {
    +            this.service.awaitTermination(10l, TimeUnit.SECONDS);
    +        } catch (InterruptedException e) {
    +            LOG.error("Failed to close assignments distribute service");
    +        }
    +        this.assignmentsQueue = null;
    +    }
    +
    +    public void addAssignmentsForNode(String node, SupervisorAssignments assignments) {
    +        try {
    +            boolean success = nextQueue().offer(NodeAssignments.getInstance(node, assignments), 5l, TimeUnit.SECONDS);
    +            if (!success) {
    +                LOG.warn("Discard an assignment distribution for node {} because the target sub queue is full", node);
    +            }
    +
    +        } catch (InterruptedException e) {
    +            LOG.error("Add node assignments interrupted: {}", e.getMessage());
    +            throw new RuntimeException(e);
    +        }
    +    }
    +
    +    static class NodeAssignments {
    +        private String node;
    +        private SupervisorAssignments assignments;
    +
    +        private NodeAssignments(String node, SupervisorAssignments assignments) {
    +            this.node = node;
    +            this.assignments = assignments;
    +        }
    +
    +        public static NodeAssignments getInstance(String node, SupervisorAssignments assignments) {
    +            return new NodeAssignments(node, assignments);
    +        }
    +
    +        public String getNode() {
    +            return this.node;
    +        }
    +
    +        public SupervisorAssignments getAssignments() {
    +            return this.assignments;
    +        }
    +
    +    }
    +
    +    /**
    +     * Task to distribute assignments.
    +     */
    +    static class DistributeTask implements Runnable {
    +        private AssignmentDistributionService service;
    +        private Integer queueIndex;
    +
    +        DistributeTask(AssignmentDistributionService service, Integer index) {
    +            this.service = service;
    +            this.queueIndex = index;
    +        }
    +
    +        @Override
    +        public void run() {
    +            while (true) {
    +                try {
    +                    NodeAssignments nodeAssignments = this.service.nextAssignments(queueIndex);
    +                    sendAssignmentsToNode(nodeAssignments);
    +                } catch (InterruptedException e) {
    +                    if (service.isActive()) {
    +                        LOG.error("Get an unexpected interrupt when distributing assignments to node, {}", e.getCause());
    +                    } else {
    +                        // service is off now just interrupt it.
    +                        Thread.currentThread().interrupt();
    +                    }
    +                }
    +            }
    +        }
    +
    +        private void sendAssignmentsToNode(NodeAssignments assignments) {
    +            SupervisorClient client = SupervisorClient.getConfiguredClient(service.getConf(), assignments.getNode());
    --- End diff --
    
    Actually we can not avoid the state inconsistent if we make it asynchronous, either we send a confirm msg to supervisor or we remove the old state in nimbus memory[ because the network transform will takes time ]. Most of the time [PRC to supervisor returns fast] we do not have any issue.
    
    If some nodes are unreachable for some time, we have the client timeout and also the supervisor sync works to make the state finally right, this is okey for killing submitting or relalancing a topology.
    
    Also I have configured the supervisor client timeout to 5 seconds.


---

[GitHub] storm pull request #2319: [STORM-2693] Nimbus assignments promotion

Posted by danny0405 <gi...@git.apache.org>.
Github user danny0405 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2319#discussion_r138781061
  
    --- Diff: storm-core/src/clj/org/apache/storm/cluster.clj ---
    @@ -244,15 +255,19 @@
     
     ;; Watches should be used for optimization. When ZK is reconnecting, they're not guaranteed to be called.
     (defnk mk-storm-cluster-state
    -  [cluster-state-spec :acls nil :context (ClusterStateContext.)]
    -  (let [[solo? cluster-state] (if (instance? ClusterState cluster-state-spec)
    -                                [false cluster-state-spec]
    -                                [true (mk-distributed-cluster-state cluster-state-spec :auth-conf cluster-state-spec :acls acls :context context)])
    +  [conf :cluster-state nil :acls nil :context (ClusterStateContext.) :backend nil]
    +  (let [[solo? cluster-state] (if (and (not-nil? cluster-state) (instance? ClusterState cluster-state))
    +                                [false cluster-state]
    +                                [true (mk-distributed-cluster-state conf :auth-conf conf :acls acls :context context)])
    +        assignments-backend (if (nil? backend)
    +                              (doto (InMemoryAssignmentBackend.) (.prepare nil nil))
    --- End diff --
    
    okey, i will make it configurable


---

[GitHub] storm pull request #2319: [STORM-2693] Nimbus assignments promotion

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2319#discussion_r138080750
  
    --- Diff: storm-core/src/clj/org/apache/storm/daemon/nimbus.clj ---
    @@ -961,14 +1015,25 @@
         ;; tasks figure out what tasks to talk to by looking at topology at runtime
         ;; only log/set when there's been a change to the assignment
         (doseq [[topology-id assignment] new-assignments
    -            :let [existing-assignment (get existing-assignments topology-id)
    -                  topology-details (.getById topologies topology-id)]]
    +            :let [existing-assignment (get existing-assignments topology-id)]]
           (if (= existing-assignment assignment)
             (log-debug "Assignment for " topology-id " hasn't changed")
             (do
               (log-message "Setting new assignment for topology id " topology-id ": " (pr-str assignment))
               (.set-assignment! storm-cluster-state topology-id assignment)
               )))
    +
    +    ;; grouping assignment by node to see the nodes diff, then notify nodes/supervisors to synchronize its owned assignment
    +    ;; because the number of existing assignments is small for every scheduling round,
    +    ;; we expect to notify supervisors at almost the same time.
    +    (->> new-assignments
    +         (map (fn [[tid new-assignment]]
    +           (let [existing-assignment (get existing-assignments tid)]
    +             (assignment-changed-nodes existing-assignment new-assignment ))))
    +         (apply concat)
    +         (into #{})
    +         (notify-supervisors-assignments conf new-assignments))
    --- End diff --
    
    I am very concerned about this.  One of our biggest issues is the timing of this loop.  It blocks scheduling a new topology, rebalancing a topology, and recovering from a failed worker or a failed supervisor.
    
    One of the key tenants of Hadoop has been to never let the main daemon reach out to the thousands of other processes that may be on the cluster, because if one of them blocks, or there are network issues or whatever it can slow done the rest of the cluster.
    
    I would really prefer to see a different model here.  At a minimum we need to have a thread pool that is used to notify the supervisors asynchronously, or even better we never actually reach out to the supervisors.  We just set something up in memory on nimbus and have the supervisors poll very frequently.  Nimbus is mostly single threaded so using more CPU is not a big deal.  We know that even 1,000 times a second for the DRPC servers is not really any load at all, so doing it a few times a second would not matter that much. 


---

[GitHub] storm pull request #2319: [STORM-2693] Nimbus assignments promotion

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2319#discussion_r138074478
  
    --- Diff: storm-core/src/clj/org/apache/storm/daemon/supervisor.clj ---
    @@ -758,6 +752,35 @@
           (catch Exception e
             (log-error e "Error running profiler actions, will retry again later")))))
     
    +(defn assigned-assignments-to-local!
    +  [^SupervisorAssignments supervisorAssignments supervisor]
    +  (when (not-nil? supervisorAssignments)
    +    (let [serialized-assignments (into {} (for [[tid amt] (.get_storm_assignment supervisorAssignments)]
    +                                            {tid (Utils/serialize amt)}))]
    +      (.sync-remote-assignments! (:storm-cluster-state supervisor) serialized-assignments))))
    +
    +;; Supervisor should be told that who is leader.
    +;; Fetch leader info each time before request node assignment.
    +;; TODO: get leader address from zk directly.
    +(defn assignments-from-master
    +  [conf supervisor]
    +  (let [client (atom nil)]
    +    (try
    +      (let [master-client (NimbusClient/getConfiguredClientAs conf nil)
    +            _ (reset! client master-client) ;; keep a refence so we can close it
    +            supervisor-assignments (.getSupervisorAssignments (.getClient master-client) (:my-hostname supervisor))]
    +        (assigned-assignments-to-local! supervisor-assignments supervisor))
    +      (catch Throwable e
    --- End diff --
    
    Lets just catch Exception and not Throwable.  There are way too many really bad things that are Errors that we don't want to ignore.


---

[GitHub] storm pull request #2319: [STORM-2693] Nimbus assignments promotion

Posted by danny0405 <gi...@git.apache.org>.
Github user danny0405 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2319#discussion_r138781163
  
    --- Diff: storm-core/src/jvm/org/apache/storm/assignments/AssignmentDistributionService.java ---
    @@ -0,0 +1,232 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + * <p/>
    + * http://www.apache.org/licenses/LICENSE-2.0
    + * <p/>
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.storm.assignments;
    +
    +import org.apache.storm.Config;
    +import org.apache.storm.generated.SupervisorAssignments;
    +import org.apache.storm.utils.SupervisorClient;
    +import org.apache.storm.utils.Utils;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.io.Closeable;
    +import java.io.IOException;
    +import java.util.HashMap;
    +import java.util.Map;
    +import java.util.Random;
    +import java.util.concurrent.ExecutorService;
    +import java.util.concurrent.Executors;
    +import java.util.concurrent.LinkedBlockingQueue;
    +import java.util.concurrent.TimeUnit;
    +
    +/**
    + * <p>A service for distributing master assignments to supervisors, this service makes the assignments notification asynchronous.
    + * <p>We support multiple working threads to distribute assignment, every thread has a queue buffer.
    + * <p>Master will shuffle its node request to the queues, if the target queue is full, we just discard the request, let the supervisors sync instead.
    + * <p/>
    + * <pre>{@code
    + * Working mode
    + *                      +--------+         +-----------------+
    + *                      | queue1 |   ==>   | Working thread1 |
    + * +--------+ shuffle   +--------+         +-----------------+
    + * | Master |   ==>
    + * +--------+           +--------+         +-----------------+
    + *                      | queue2 |   ==>   | Working thread2 |
    + *                      +--------+         +-----------------+
    + * }
    + * </pre>
    + */
    +public class AssignmentDistributionService implements Closeable {
    +    private static final Logger LOG = LoggerFactory.getLogger(AssignmentDistributionService.class);
    +    private ExecutorService service;
    +
    +    /**
    +     * Flag to indicate if the service is active
    +     */
    +    private volatile boolean active = false;
    +
    +    private Random random;
    +    /**
    +     * Working threads num.
    +     */
    +    private int threadsNum = 0;
    +    /**
    +     * Working thread queue size.
    +     */
    +    private int queueSize = 0;
    +
    +    /**
    +     * Assignments request queue.
    +     */
    +    private volatile Map<Integer, LinkedBlockingQueue<NodeAssignments>> assignmentsQueue;
    +
    +    private Map conf;
    +
    +    /**
    +     * Function for initialization.
    +     *
    +     * @param conf
    +     */
    +    public void prepare(Map conf) {
    +        this.conf = conf;
    +        this.random = new Random(47);
    +
    +        this.threadsNum = Utils.getInt(conf.get(Config.NIMBUS_ASSIGNMENTS_SERVICE_THREADS), 10);
    +        this.queueSize = Utils.getInt(conf.get(Config.NIMBUS_ASSIGNMENTS_SERVICE_THREAD_QUEUE_SIZE), 100);
    +
    +        this.assignmentsQueue = new HashMap<>();
    +        for (int i = 0; i < threadsNum; i++) {
    +            this.assignmentsQueue.put(i, new LinkedBlockingQueue<NodeAssignments>(queueSize));
    +        }
    +        //start the thread pool
    +        this.service = Executors.newFixedThreadPool(threadsNum);
    +        this.active = true;
    +        //start the threads
    +        for (int i = 0; i < threadsNum; i++) {
    +            this.service.submit(new DistributeTask(this, i));
    +        }
    +    }
    +
    +    @Override
    +    public void close() throws IOException {
    +        this.active = false;
    +        this.service.shutdownNow();
    +        try {
    +            this.service.awaitTermination(10l, TimeUnit.SECONDS);
    +        } catch (InterruptedException e) {
    +            LOG.error("Failed to close assignments distribute service");
    +        }
    +        this.assignmentsQueue = null;
    +    }
    +
    +    public void addAssignmentsForNode(String node, SupervisorAssignments assignments) {
    +        try {
    +            boolean success = nextQueue().offer(NodeAssignments.getInstance(node, assignments), 5l, TimeUnit.SECONDS);
    --- End diff --
    
    i also config the SupervisorClient timeout to 5s timeout, which is taken in ThriftConnectionType.


---

[GitHub] storm issue #2319: [STORM-2693] Nimbus assignments promotion

Posted by danny0405 <gi...@git.apache.org>.
Github user danny0405 commented on the issue:

    https://github.com/apache/storm/pull/2319
  
    @HeartSaVioR i have made a PR for storm 2.0, really thx for your review work, this is the link:
    https://github.com/apache/storm/pull/2431/


---

[GitHub] storm issue #2319: [STORM-2693] Nimbus assignments promotion

Posted by danny0405 <gi...@git.apache.org>.
Github user danny0405 commented on the issue:

    https://github.com/apache/storm/pull/2319
  
    @revans2 really thx for your review work


---

[GitHub] storm pull request #2319: [STORM-2693] Nimbus assignments promotion

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2319#discussion_r138071918
  
    --- Diff: storm-core/pom.xml ---
    @@ -331,6 +331,10 @@
                 <artifactId>commons-codec</artifactId>
             </dependency>
             <dependency>
    +            <groupId>org.rocksdb</groupId>
    +            <artifactId>rocksdbjni</artifactId>
    +        </dependency>
    --- End diff --
    
    I am sorry to say that this is not a rolling upgrade.  This can only go into 2.x+ with rocksdbjni.  rocksdbjni is based off of jni as the name suggests so it cannot be shaded, and it is a common enough library that I am concerned that it will cause conflicts with many others when it shows up on the worker classpath.
    
    It is also a library that changes very quickly, so it is not simple to pick a version that is likely to work for everyone. 


---

[GitHub] storm pull request #2319: [STORM-2693] Nimbus assignments promotion

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2319#discussion_r138074783
  
    --- Diff: storm-core/src/clj/org/apache/storm/daemon/worker.clj ---
    @@ -47,9 +48,30 @@
     
     (defmulti mk-suicide-fn cluster-mode)
     
    +(defn get-local-assignment
    +  [conf storm-id cluster-state]
    +  (try
    +    (let [supervisor-cli (SupervisorClient/getConfiguredClient conf (memoized-local-hostname))
    +          assignment (converter/clojurify-assignment (.getLocalAssignmentForStorm (.getClient supervisor-cli) storm-id))]
    +      (try
    +        (.close supervisor-cli)
    +        (catch Throwable e
    +          (log-warn (.getMessage e) "Exception when close supervisor client.")))
    +      assignment)
    +    (catch Throwable e
    +      ;; if any error/exception thrown, fetch it from zookeeper
    +      (.remote-assignment-info cluster-state storm-id)
    +      )))
    +
     (defn read-worker-executors [storm-conf storm-cluster-state storm-id assignment-id port assignment-versions]
       (log-message "Reading Assignments.")
    -  (let [assignment (:executor->node+port (.assignment-info storm-cluster-state storm-id nil))]
    +  ;; TODO: keep a long time connection when worker need more heart beat to supervisor.
    --- End diff --
    
    Another TODO


---

[GitHub] storm issue #2319: [STORM-2693] Nimbus assignments promotion

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on the issue:

    https://github.com/apache/storm/pull/2319
  
    @danny0405 Yes right. Thank you for your patience and understanding the policy of Storm contribution.


---

[GitHub] storm pull request #2319: [STORM-2693] Nimbus assignments promotion

Posted by danny0405 <gi...@git.apache.org>.
Github user danny0405 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2319#discussion_r138780949
  
    --- Diff: storm-core/src/clj/org/apache/storm/cluster.clj ---
    @@ -123,6 +133,7 @@
     (def SUPERVISORS-SUBTREE (str "/" SUPERVISORS-ROOT))
     (def WORKERBEATS-SUBTREE (str "/" WORKERBEATS-ROOT))
     (def BACKPRESSURE-SUBTREE (str "/" BACKPRESSURE-ROOT))
    +(def LEADERINFO-SUBTREE (str "/" LEADERINFO-ROOT))
    --- End diff --
    
    Actually, this is not really needed, but if we always get leader from nimbus, we will make 2 RPC requests every time we contact with the leader [supervisor to nimbus/], instead get it from zookeeper directly is a better way.


---

[GitHub] storm issue #2319: [STORM-2693] Nimbus assignments promotion

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on the issue:

    https://github.com/apache/storm/pull/2319
  
    Also given that the change is significant, we may want to integrate only 2.0.0 instead of having this in 1.x version line.


---

[GitHub] storm pull request #2319: [STORM-2693] Nimbus assignments promotion

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2319#discussion_r138659723
  
    --- Diff: storm-core/src/clj/org/apache/storm/cluster.clj ---
    @@ -244,15 +255,19 @@
     
     ;; Watches should be used for optimization. When ZK is reconnecting, they're not guaranteed to be called.
     (defnk mk-storm-cluster-state
    -  [cluster-state-spec :acls nil :context (ClusterStateContext.)]
    -  (let [[solo? cluster-state] (if (instance? ClusterState cluster-state-spec)
    -                                [false cluster-state-spec]
    -                                [true (mk-distributed-cluster-state cluster-state-spec :auth-conf cluster-state-spec :acls acls :context context)])
    +  [conf :cluster-state nil :acls nil :context (ClusterStateContext.) :backend nil]
    --- End diff --
    
    Could we rename backend to assignments-backend?  backend feels too generic for what this actually is.


---

[GitHub] storm issue #2319: [STORM-2693] Nimbus assignments promotion

Posted by danny0405 <gi...@git.apache.org>.
Github user danny0405 commented on the issue:

    https://github.com/apache/storm/pull/2319
  
    @HeartSaVioR @revans2 
    i have change the default backend to memory one, also i change the distribute mode to  asynchronous
    i chose a rocks-db backend because when the cluster goes to large, assignments-info will takes more memory cache[ memory mode ], but if this is not a factor, we can just remote it


---

[GitHub] storm issue #2319: [STORM-2693] Nimbus assignments promotion

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on the issue:

    https://github.com/apache/storm/pull/2319
  
    @danny0405 I am not concerned about the memory for the cache, because we are already keeping all of that in memory while we schedule the topologies anyways.  If it does prove to be an issue we can address it then.


---

[GitHub] storm issue #2319: [STORM-2693] Nimbus assignments promotion

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on the issue:

    https://github.com/apache/storm/pull/2319
  
    @danny0405 the most recent changes look good, but I really want to spend some time to dig into the code.


---

[GitHub] storm issue #2319: [STORM-2693] Nimbus assignments promotion

Posted by danny0405 <gi...@git.apache.org>.
Github user danny0405 commented on the issue:

    https://github.com/apache/storm/pull/2319
  
    @HeartSaVioR @revans2 i have moved all the features to
    https://github.com/apache/storm/pull/2389 for 1.1.x-branch
    https://github.com/apache/storm/pull/2433 for 2.0-branch
    so this is closed now


---

[GitHub] storm issue #2319: [STORM-2693] Nimbus assignments promotion

Posted by danny0405 <gi...@git.apache.org>.
Github user danny0405 commented on the issue:

    https://github.com/apache/storm/pull/2319
  
    @revans2 
    I have removed the rocks db backend



---

[GitHub] storm pull request #2319: [STORM-2693] Nimbus assignments promotion

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2319#discussion_r138662304
  
    --- Diff: storm-core/src/jvm/org/apache/storm/assignments/AssignmentDistributionService.java ---
    @@ -0,0 +1,232 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + * <p/>
    + * http://www.apache.org/licenses/LICENSE-2.0
    + * <p/>
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.storm.assignments;
    +
    +import org.apache.storm.Config;
    +import org.apache.storm.generated.SupervisorAssignments;
    +import org.apache.storm.utils.SupervisorClient;
    +import org.apache.storm.utils.Utils;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.io.Closeable;
    +import java.io.IOException;
    +import java.util.HashMap;
    +import java.util.Map;
    +import java.util.Random;
    +import java.util.concurrent.ExecutorService;
    +import java.util.concurrent.Executors;
    +import java.util.concurrent.LinkedBlockingQueue;
    +import java.util.concurrent.TimeUnit;
    +
    +/**
    + * <p>A service for distributing master assignments to supervisors, this service makes the assignments notification asynchronous.
    + * <p>We support multiple working threads to distribute assignment, every thread has a queue buffer.
    + * <p>Master will shuffle its node request to the queues, if the target queue is full, we just discard the request, let the supervisors sync instead.
    + * <p/>
    + * <pre>{@code
    + * Working mode
    + *                      +--------+         +-----------------+
    + *                      | queue1 |   ==>   | Working thread1 |
    + * +--------+ shuffle   +--------+         +-----------------+
    + * | Master |   ==>
    + * +--------+           +--------+         +-----------------+
    + *                      | queue2 |   ==>   | Working thread2 |
    + *                      +--------+         +-----------------+
    + * }
    + * </pre>
    + */
    +public class AssignmentDistributionService implements Closeable {
    +    private static final Logger LOG = LoggerFactory.getLogger(AssignmentDistributionService.class);
    +    private ExecutorService service;
    +
    +    /**
    +     * Flag to indicate if the service is active
    +     */
    +    private volatile boolean active = false;
    +
    +    private Random random;
    +    /**
    +     * Working threads num.
    +     */
    +    private int threadsNum = 0;
    +    /**
    +     * Working thread queue size.
    +     */
    +    private int queueSize = 0;
    +
    +    /**
    +     * Assignments request queue.
    +     */
    +    private volatile Map<Integer, LinkedBlockingQueue<NodeAssignments>> assignmentsQueue;
    +
    +    private Map conf;
    +
    +    /**
    +     * Function for initialization.
    +     *
    +     * @param conf
    +     */
    +    public void prepare(Map conf) {
    +        this.conf = conf;
    +        this.random = new Random(47);
    +
    +        this.threadsNum = Utils.getInt(conf.get(Config.NIMBUS_ASSIGNMENTS_SERVICE_THREADS), 10);
    +        this.queueSize = Utils.getInt(conf.get(Config.NIMBUS_ASSIGNMENTS_SERVICE_THREAD_QUEUE_SIZE), 100);
    +
    +        this.assignmentsQueue = new HashMap<>();
    +        for (int i = 0; i < threadsNum; i++) {
    +            this.assignmentsQueue.put(i, new LinkedBlockingQueue<NodeAssignments>(queueSize));
    +        }
    +        //start the thread pool
    +        this.service = Executors.newFixedThreadPool(threadsNum);
    +        this.active = true;
    +        //start the threads
    +        for (int i = 0; i < threadsNum; i++) {
    +            this.service.submit(new DistributeTask(this, i));
    +        }
    +    }
    +
    +    @Override
    +    public void close() throws IOException {
    +        this.active = false;
    +        this.service.shutdownNow();
    +        try {
    +            this.service.awaitTermination(10l, TimeUnit.SECONDS);
    +        } catch (InterruptedException e) {
    +            LOG.error("Failed to close assignments distribute service");
    +        }
    +        this.assignmentsQueue = null;
    +    }
    +
    +    public void addAssignmentsForNode(String node, SupervisorAssignments assignments) {
    +        try {
    +            boolean success = nextQueue().offer(NodeAssignments.getInstance(node, assignments), 5l, TimeUnit.SECONDS);
    --- End diff --
    
    So we have a 5 second timeout on offering, instead of a 5 second timeout on connecting and sending?


---

[GitHub] storm pull request #2319: [STORM-2693] Nimbus assignments promotion

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2319#discussion_r138074168
  
    --- Diff: storm-core/src/clj/org/apache/storm/daemon/supervisor.clj ---
    @@ -758,6 +752,35 @@
           (catch Exception e
             (log-error e "Error running profiler actions, will retry again later")))))
     
    +(defn assigned-assignments-to-local!
    +  [^SupervisorAssignments supervisorAssignments supervisor]
    +  (when (not-nil? supervisorAssignments)
    +    (let [serialized-assignments (into {} (for [[tid amt] (.get_storm_assignment supervisorAssignments)]
    +                                            {tid (Utils/serialize amt)}))]
    +      (.sync-remote-assignments! (:storm-cluster-state supervisor) serialized-assignments))))
    +
    +;; Supervisor should be told that who is leader.
    +;; Fetch leader info each time before request node assignment.
    +;; TODO: get leader address from zk directly.
    --- End diff --
    
    Another TODO here


---

[GitHub] storm pull request #2319: [STORM-2693] Nimbus assignments promotion

Posted by danny0405 <gi...@git.apache.org>.
Github user danny0405 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2319#discussion_r138780747
  
    --- Diff: storm-core/src/clj/org/apache/storm/cluster.clj ---
    @@ -244,15 +255,19 @@
     
     ;; Watches should be used for optimization. When ZK is reconnecting, they're not guaranteed to be called.
     (defnk mk-storm-cluster-state
    -  [cluster-state-spec :acls nil :context (ClusterStateContext.)]
    -  (let [[solo? cluster-state] (if (instance? ClusterState cluster-state-spec)
    -                                [false cluster-state-spec]
    -                                [true (mk-distributed-cluster-state cluster-state-spec :auth-conf cluster-state-spec :acls acls :context context)])
    +  [conf :cluster-state nil :acls nil :context (ClusterStateContext.) :backend nil]
    --- End diff --
    
    okey, i will rename it


---

[GitHub] storm issue #2319: [STORM-2693] Nimbus assignments promotion

Posted by danny0405 <gi...@git.apache.org>.
Github user danny0405 commented on the issue:

    https://github.com/apache/storm/pull/2319
  
    @HeartSaVioR ok, i will do the 2.0.0 version, the promotion is running on our production cluster for a while and it works great, i will start to work on the 2.0.0 branch now, is the master branch 2.0 version right?


---

[GitHub] storm pull request #2319: [STORM-2693] Nimbus assignments promotion

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2319#discussion_r138662036
  
    --- Diff: storm-core/src/jvm/org/apache/storm/assignments/AssignmentDistributionService.java ---
    @@ -0,0 +1,232 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + * <p/>
    + * http://www.apache.org/licenses/LICENSE-2.0
    + * <p/>
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.storm.assignments;
    +
    +import org.apache.storm.Config;
    +import org.apache.storm.generated.SupervisorAssignments;
    +import org.apache.storm.utils.SupervisorClient;
    +import org.apache.storm.utils.Utils;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.io.Closeable;
    +import java.io.IOException;
    +import java.util.HashMap;
    +import java.util.Map;
    +import java.util.Random;
    +import java.util.concurrent.ExecutorService;
    +import java.util.concurrent.Executors;
    +import java.util.concurrent.LinkedBlockingQueue;
    +import java.util.concurrent.TimeUnit;
    +
    +/**
    + * <p>A service for distributing master assignments to supervisors, this service makes the assignments notification asynchronous.
    + * <p>We support multiple working threads to distribute assignment, every thread has a queue buffer.
    + * <p>Master will shuffle its node request to the queues, if the target queue is full, we just discard the request, let the supervisors sync instead.
    + * <p/>
    + * <pre>{@code
    + * Working mode
    + *                      +--------+         +-----------------+
    + *                      | queue1 |   ==>   | Working thread1 |
    + * +--------+ shuffle   +--------+         +-----------------+
    + * | Master |   ==>
    + * +--------+           +--------+         +-----------------+
    + *                      | queue2 |   ==>   | Working thread2 |
    + *                      +--------+         +-----------------+
    + * }
    + * </pre>
    + */
    +public class AssignmentDistributionService implements Closeable {
    +    private static final Logger LOG = LoggerFactory.getLogger(AssignmentDistributionService.class);
    +    private ExecutorService service;
    +
    +    /**
    +     * Flag to indicate if the service is active
    +     */
    +    private volatile boolean active = false;
    +
    +    private Random random;
    +    /**
    +     * Working threads num.
    +     */
    +    private int threadsNum = 0;
    +    /**
    +     * Working thread queue size.
    +     */
    +    private int queueSize = 0;
    +
    +    /**
    +     * Assignments request queue.
    +     */
    +    private volatile Map<Integer, LinkedBlockingQueue<NodeAssignments>> assignmentsQueue;
    +
    +    private Map conf;
    +
    +    /**
    +     * Function for initialization.
    +     *
    +     * @param conf
    +     */
    +    public void prepare(Map conf) {
    +        this.conf = conf;
    +        this.random = new Random(47);
    +
    +        this.threadsNum = Utils.getInt(conf.get(Config.NIMBUS_ASSIGNMENTS_SERVICE_THREADS), 10);
    +        this.queueSize = Utils.getInt(conf.get(Config.NIMBUS_ASSIGNMENTS_SERVICE_THREAD_QUEUE_SIZE), 100);
    +
    +        this.assignmentsQueue = new HashMap<>();
    +        for (int i = 0; i < threadsNum; i++) {
    +            this.assignmentsQueue.put(i, new LinkedBlockingQueue<NodeAssignments>(queueSize));
    +        }
    +        //start the thread pool
    +        this.service = Executors.newFixedThreadPool(threadsNum);
    +        this.active = true;
    +        //start the threads
    +        for (int i = 0; i < threadsNum; i++) {
    +            this.service.submit(new DistributeTask(this, i));
    +        }
    +    }
    +
    +    @Override
    +    public void close() throws IOException {
    +        this.active = false;
    +        this.service.shutdownNow();
    +        try {
    +            this.service.awaitTermination(10l, TimeUnit.SECONDS);
    +        } catch (InterruptedException e) {
    +            LOG.error("Failed to close assignments distribute service");
    +        }
    +        this.assignmentsQueue = null;
    +    }
    +
    +    public void addAssignmentsForNode(String node, SupervisorAssignments assignments) {
    +        try {
    +            boolean success = nextQueue().offer(NodeAssignments.getInstance(node, assignments), 5l, TimeUnit.SECONDS);
    +            if (!success) {
    +                LOG.warn("Discard an assignment distribution for node {} because the target sub queue is full", node);
    +            }
    +
    +        } catch (InterruptedException e) {
    +            LOG.error("Add node assignments interrupted: {}", e.getMessage());
    +            throw new RuntimeException(e);
    +        }
    +    }
    +
    +    static class NodeAssignments {
    +        private String node;
    +        private SupervisorAssignments assignments;
    +
    +        private NodeAssignments(String node, SupervisorAssignments assignments) {
    +            this.node = node;
    +            this.assignments = assignments;
    +        }
    +
    +        public static NodeAssignments getInstance(String node, SupervisorAssignments assignments) {
    +            return new NodeAssignments(node, assignments);
    +        }
    +
    +        public String getNode() {
    +            return this.node;
    +        }
    +
    +        public SupervisorAssignments getAssignments() {
    +            return this.assignments;
    +        }
    +
    +    }
    +
    +    /**
    +     * Task to distribute assignments.
    +     */
    +    static class DistributeTask implements Runnable {
    +        private AssignmentDistributionService service;
    +        private Integer queueIndex;
    +
    +        DistributeTask(AssignmentDistributionService service, Integer index) {
    +            this.service = service;
    +            this.queueIndex = index;
    +        }
    +
    +        @Override
    +        public void run() {
    +            while (true) {
    +                try {
    +                    NodeAssignments nodeAssignments = this.service.nextAssignments(queueIndex);
    +                    sendAssignmentsToNode(nodeAssignments);
    +                } catch (InterruptedException e) {
    +                    if (service.isActive()) {
    +                        LOG.error("Get an unexpected interrupt when distributing assignments to node, {}", e.getCause());
    +                    } else {
    +                        // service is off now just interrupt it.
    +                        Thread.currentThread().interrupt();
    +                    }
    +                }
    +            }
    +        }
    +
    +        private void sendAssignmentsToNode(NodeAssignments assignments) {
    +            SupervisorClient client = SupervisorClient.getConfiguredClient(service.getConf(), assignments.getNode());
    --- End diff --
    
    SupervisorClient should be AutoClosable, so we can use java to ensure it is closed.


---

[GitHub] storm issue #2319: [STORM-2693] Nimbus assignments promotion

Posted by danny0405 <gi...@git.apache.org>.
Github user danny0405 commented on the issue:

    https://github.com/apache/storm/pull/2319
  
    @revans2 hi, is this PR okey for merging now? I have another heartbeats promotion which is based on this, and i expect for your review too


---

[GitHub] storm pull request #2319: [STORM-2693] Nimbus assignments promotion

Posted by danny0405 <gi...@git.apache.org>.
Github user danny0405 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2319#discussion_r138091372
  
    --- Diff: storm-core/src/clj/org/apache/storm/daemon/supervisor.clj ---
    @@ -758,6 +752,35 @@
           (catch Exception e
             (log-error e "Error running profiler actions, will retry again later")))))
     
    +(defn assigned-assignments-to-local!
    +  [^SupervisorAssignments supervisorAssignments supervisor]
    +  (when (not-nil? supervisorAssignments)
    +    (let [serialized-assignments (into {} (for [[tid amt] (.get_storm_assignment supervisorAssignments)]
    +                                            {tid (Utils/serialize amt)}))]
    +      (.sync-remote-assignments! (:storm-cluster-state supervisor) serialized-assignments))))
    +
    +;; Supervisor should be told that who is leader.
    +;; Fetch leader info each time before request node assignment.
    +;; TODO: get leader address from zk directly.
    +(defn assignments-from-master
    +  [conf supervisor]
    +  (let [client (atom nil)]
    +    (try
    +      (let [master-client (NimbusClient/getConfiguredClientAs conf nil)
    +            _ (reset! client master-client) ;; keep a refence so we can close it
    +            supervisor-assignments (.getSupervisorAssignments (.getClient master-client) (:my-hostname supervisor))]
    +        (assigned-assignments-to-local! supervisor-assignments supervisor))
    +      (catch Throwable e
    --- End diff --
    
    okey, but if we just throw it out, supervisor will restart more frequently, i have no idea now, this need a discussion


---

[GitHub] storm pull request #2319: [STORM-2693] Nimbus assignments promotion

Posted by danny0405 <gi...@git.apache.org>.
Github user danny0405 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2319#discussion_r138097813
  
    --- Diff: storm-core/src/clj/org/apache/storm/daemon/nimbus.clj ---
    @@ -961,14 +1015,25 @@
         ;; tasks figure out what tasks to talk to by looking at topology at runtime
         ;; only log/set when there's been a change to the assignment
         (doseq [[topology-id assignment] new-assignments
    -            :let [existing-assignment (get existing-assignments topology-id)
    -                  topology-details (.getById topologies topology-id)]]
    +            :let [existing-assignment (get existing-assignments topology-id)]]
           (if (= existing-assignment assignment)
             (log-debug "Assignment for " topology-id " hasn't changed")
             (do
               (log-message "Setting new assignment for topology id " topology-id ": " (pr-str assignment))
               (.set-assignment! storm-cluster-state topology-id assignment)
               )))
    +
    +    ;; grouping assignment by node to see the nodes diff, then notify nodes/supervisors to synchronize its owned assignment
    +    ;; because the number of existing assignments is small for every scheduling round,
    +    ;; we expect to notify supervisors at almost the same time.
    +    (->> new-assignments
    +         (map (fn [[tid new-assignment]]
    +           (let [existing-assignment (get existing-assignments tid)]
    +             (assignment-changed-nodes existing-assignment new-assignment ))))
    +         (apply concat)
    +         (into #{})
    +         (notify-supervisors-assignments conf new-assignments))
    --- End diff --
    
    Really thx for your suggestions, i agree with you that putting a blocking method into nimbus's main loop is not a good idea, an thread poll is a better choice.
    Because only assignments changed nodes will be notified in a round, the nodes num will not be very large[ except that we submit really large topologies or submit large number of topologies at almost the same time ]
    Actually supervisors will synchronize its assignments every 10 seconds, if we make nimbus mk-assignments totally asynchronous, it will be okey.


---

[GitHub] storm pull request #2319: [STORM-2693] Nimbus assignments promotion

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2319#discussion_r138659373
  
    --- Diff: storm-core/src/clj/org/apache/storm/cluster.clj ---
    @@ -244,15 +255,19 @@
     
     ;; Watches should be used for optimization. When ZK is reconnecting, they're not guaranteed to be called.
     (defnk mk-storm-cluster-state
    -  [cluster-state-spec :acls nil :context (ClusterStateContext.)]
    -  (let [[solo? cluster-state] (if (instance? ClusterState cluster-state-spec)
    -                                [false cluster-state-spec]
    -                                [true (mk-distributed-cluster-state cluster-state-spec :auth-conf cluster-state-spec :acls acls :context context)])
    +  [conf :cluster-state nil :acls nil :context (ClusterStateContext.) :backend nil]
    +  (let [[solo? cluster-state] (if (and (not-nil? cluster-state) (instance? ClusterState cluster-state))
    +                                [false cluster-state]
    +                                [true (mk-distributed-cluster-state conf :auth-conf conf :acls acls :context context)])
    +        assignments-backend (if (nil? backend)
    +                              (doto (InMemoryAssignmentBackend.) (.prepare nil nil))
    --- End diff --
    
    Why do we call `(.prepare nil nil)` only on the InMemoryAssignmentBackend?  It feels like we might be making it difficult to extend it in the future.  From what I have seen the assignments dir is a subdirectory of storm-local.  Could we just make prepare take the conf, and then we can pass in the conf we already have.


---

[GitHub] storm issue #2319: [STORM-2693] Nimbus assignments promotion

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on the issue:

    https://github.com/apache/storm/pull/2319
  
    Before looking into detail, according to design doc, as I commented to the JIRA issue, I don't see a reason to store cache to disk or so, since ZK is still a source of truth and whenever process starts it should sync up with ZK because cache may be outdated. It looks like keeping cache in memory looks sufficient.


---

[GitHub] storm pull request #2319: [STORM-2693] Nimbus assignments promotion

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2319#discussion_r138074935
  
    --- Diff: storm-core/src/clj/org/apache/storm/daemon/worker.clj ---
    @@ -351,6 +373,8 @@
                 (.sendLoadMetrics (:receiver worker) local-pop)
                 (reset! next-update (+ LOAD-REFRESH-INTERVAL-MS now))))))))
     
    +;;TODO: define a method to get this
    --- End diff --
    
    Another TODO


---

[GitHub] storm pull request #2319: [STORM-2693] Nimbus assignments promotion

Posted by danny0405 <gi...@git.apache.org>.
Github user danny0405 closed the pull request at:

    https://github.com/apache/storm/pull/2319


---

[GitHub] storm issue #2319: [STORM-2693] Nimbus assignments promotion

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on the issue:

    https://github.com/apache/storm/pull/2319
  
    @danny0405 conceptually this is a wonderful improvement.  I will try to take some time looking into this patch.  One of the big things we will need though is a corresponding patch for the master branch.  You don't have to do it now.  Waiting until the review are done is probably best, but I want to set the expectations appropriately.


---