You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@storm.apache.org by abhishekagarwal87 <gi...@git.apache.org> on 2016/11/01 11:34:16 UTC

[GitHub] storm pull request #1756: STORM-1278: Port org.apache.storm.daemon.worker to...

GitHub user abhishekagarwal87 opened a pull request:

    https://github.com/apache/storm/pull/1756

    STORM-1278: Port org.apache.storm.daemon.worker to java

    @revans2 - I may need some help in repairing worker unit tests. 

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/abhishekagarwal87/storm STORM-1278

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/storm/pull/1756.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #1756
    
----
commit f9fb20696b4e6e9dd7419b291087123c65cbd35c
Author: Abhishek Agarwal <ab...@appdynamics.com>
Date:   2016-10-31T08:20:06Z

    STORM-1278: Merge with master

commit 136995aaf28589bfaa6ce9c19421a4615afb511f
Author: Abhishek Agarwal <ab...@appdynamics.com>
Date:   2016-11-01T10:50:25Z

    Fix bugs

commit 175871f93aba9819696a64f37f9709c1123b3b21
Author: Abhishek Agarwal <ab...@appdynamics.com>
Date:   2016-11-01T11:23:39Z

    Fixed transactional test

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1756: STORM-1278: Port org.apache.storm.daemon.worker to...

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1756#discussion_r87161787
  
    --- Diff: storm-core/src/jvm/org/apache/storm/daemon/worker/LogConfigManager.java ---
    @@ -0,0 +1,157 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + * <p>
    + * http://www.apache.org/licenses/LICENSE-2.0
    + * <p>
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.storm.daemon.worker;
    +
    +import org.apache.logging.log4j.Level;
    +import org.apache.logging.log4j.LogManager;
    +import org.apache.logging.log4j.core.LoggerContext;
    +import org.apache.logging.log4j.core.config.Configuration;
    +import org.apache.logging.log4j.core.config.LoggerConfig;
    +import org.apache.storm.utils.Time;
    +import org.apache.storm.generated.LogConfig;
    +import org.apache.storm.generated.LogLevel;
    +import org.apache.storm.generated.LogLevelAction;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.util.HashMap;
    +import java.util.Map;
    +import java.util.TreeMap;
    +import java.util.TreeSet;
    +import java.util.concurrent.atomic.AtomicReference;
    +
    +public class LogConfigManager {
    +
    +    private static final Logger LOG = LoggerFactory.getLogger(LogConfigManager.class);
    +
    +    private final AtomicReference<TreeMap<String, LogLevel>> latestLogConfig;
    +    private final Map<String, Level> originalLogLevels;
    +
    +    public LogConfigManager() {
    +        this(new AtomicReference<>(new TreeMap<>()));
    +    }
    +
    +    public LogConfigManager(AtomicReference<TreeMap<String, LogLevel>> latestLogConfig) {
    +        this.latestLogConfig = latestLogConfig;
    +        this.originalLogLevels = getLoggerLevels();
    +        LOG.info("Started with log levels: {}", originalLogLevels);
    +    }
    +
    +    public void processLogConfigChange(LogConfig logConfig) {
    +        if (null != logConfig) {
    +            LOG.debug("Processing received log config: {}", logConfig);
    +            TreeMap<String, LogLevel> loggers = new TreeMap<>(logConfig.get_named_logger_level());
    +            LoggerContext logContext = (LoggerContext) LogManager.getContext(false);
    +            Map<String, LogLevel> newLogConfigs = new HashMap<>();
    +            for (Map.Entry<String, LogLevel> entry : loggers.entrySet()) {
    +                String msgLoggerName = entry.getKey();
    +                msgLoggerName = ("ROOT".equalsIgnoreCase(msgLoggerName)) ? LogManager.ROOT_LOGGER_NAME : msgLoggerName;
    +                LogLevel loggerLevel = entry.getValue();
    +                // the new-timeouts map now contains logger => timeout
    +                if (loggerLevel.is_set_reset_log_level_timeout_epoch()) {
    +                    LogLevel copy = new LogLevel(loggerLevel);
    +                    if (originalLogLevels.containsKey(msgLoggerName)) {
    +                        copy.set_reset_log_level(originalLogLevels.get(msgLoggerName).name());
    +                    } else {
    +                        copy.set_reset_log_level(Level.INFO.name());
    +                    }
    +
    +                    //copy.unset_reset_log_level();
    --- End diff --
    
    Nitpick: remove comment here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1756: STORM-1278: Port org.apache.storm.daemon.worker to...

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1756#discussion_r87187329
  
    --- Diff: storm-core/src/jvm/org/apache/storm/daemon/worker/Worker.java ---
    @@ -0,0 +1,426 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + * <p>
    + * http://www.apache.org/licenses/LICENSE-2.0
    + * <p>
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.storm.daemon.worker;
    +
    +import com.google.common.base.Preconditions;
    +import com.lmax.disruptor.EventHandler;
    +import org.apache.commons.io.FileUtils;
    +import org.apache.commons.lang.ObjectUtils;
    +import org.apache.storm.Config;
    +import org.apache.storm.cluster.*;
    +import org.apache.storm.daemon.DaemonCommon;
    +import org.apache.storm.daemon.Shutdownable;
    +import org.apache.storm.daemon.StormCommon;
    +import org.apache.storm.executor.Executor;
    +import org.apache.storm.executor.ExecutorShutdown;
    +import org.apache.storm.executor.IRunningExecutor;
    +import org.apache.storm.executor.LocalExecutor;
    +import org.apache.storm.generated.*;
    +import org.apache.storm.messaging.IConnection;
    +import org.apache.storm.messaging.IContext;
    +import org.apache.storm.messaging.TaskMessage;
    +import org.apache.storm.security.auth.AuthUtils;
    +import org.apache.storm.security.auth.IAutoCredentials;
    +import org.apache.storm.stats.StatsUtil;
    +import org.apache.storm.utils.*;
    +import org.apache.zookeeper.data.ACL;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +import uk.org.lidalia.sysoutslf4j.context.SysOutOverSLF4J;
    +
    +import javax.security.auth.Subject;
    +import java.io.File;
    +import java.io.IOException;
    +import java.nio.charset.Charset;
    +import java.security.PrivilegedExceptionAction;
    +import java.util.*;
    +import java.util.concurrent.atomic.AtomicReference;
    +import java.util.function.Function;
    +import java.util.stream.Collectors;
    +
    +public class Worker implements Shutdownable, DaemonCommon {
    +
    +    private static final Logger LOG = LoggerFactory.getLogger(Worker.class);
    +    private final Map conf;
    +    private final IContext context;
    +    private final String topologyId;
    +    private final String assignmentId;
    +    private final int port;
    +    private final String workerId;
    +    private final LogConfigManager logConfigManager;
    +
    +
    +    private WorkerState workerState;
    +    private AtomicReference<List<IRunningExecutor>> executorsAtom;
    +    private Thread transferThread;
    +    private WorkerBackpressureThread backpressureThread;
    +
    +    private AtomicReference<Credentials> credentialsAtom;
    +    private Subject subject;
    +    private Collection<IAutoCredentials> autoCreds;
    +
    +
    +    /**
    +     * TODO: should worker even take the topologyId as input? this should be
    +     * deducable from cluster state (by searching through assignments)
    +     * what about if there's inconsistency in assignments? -> but nimbus should guarantee this consistency
    +     *
    +     * @param conf         - Storm configuration
    +     * @param context      -
    +     * @param topologyId   - topology id
    +     * @param assignmentId - assignement id
    +     * @param port         - port on which the worker runs
    +     * @param workerId     - worker id
    +     */
    +
    +    public Worker(Map conf, IContext context, String topologyId, String assignmentId, int port, String workerId) {
    +        this.conf = conf;
    +        this.context = context;
    +        this.topologyId = topologyId;
    +        this.assignmentId = assignmentId;
    +        this.port = port;
    +        this.workerId = workerId;
    +        this.logConfigManager = new LogConfigManager();
    +    }
    +
    +    public void start() throws Exception {
    +        LOG.info("Launching worker for {} on {}:{} with id {} and conf {}", topologyId, assignmentId, port, workerId,
    +            conf);
    +        // because in local mode, its not a separate
    +        // process. supervisor will register it in this case
    +        // if ConfigUtils.isLocalMode(conf) returns false then it is in distributed mode.
    +        if (!ConfigUtils.isLocalMode(conf)) {
    +            // Distributed mode
    +            SysOutOverSLF4J.sendSystemOutAndErrToSLF4J();
    +            String pid = Utils.processPid();
    +            FileUtils.touch(new File(ConfigUtils.workerPidPath(conf, workerId, pid)));
    +            FileUtils.writeStringToFile(new File(ConfigUtils.workerArtifactsPidPath(conf, topologyId, port)), pid,
    +                Charset.forName("UTF-8"));
    +        }
    +        final Map topologyConf =
    +            ConfigUtils.overrideLoginConfigWithSystemProperty(ConfigUtils.readSupervisorStormConf(conf, topologyId));
    +        List<ACL> acls = Utils.getWorkerACL(topologyConf);
    +        IStateStorage stateStorage =
    +            ClusterUtils.mkStateStorage(conf, topologyConf, acls, new ClusterStateContext(DaemonType.WORKER));
    +        IStormClusterState stormClusterState =
    +            ClusterUtils.mkStormClusterState(stateStorage, acls, new ClusterStateContext());
    +        Credentials initialCredentials = stormClusterState.credentials(topologyId, null);
    +        autoCreds = AuthUtils.GetAutoCredentials(topologyConf);
    +        subject = AuthUtils.populateSubject(null, autoCreds, initialCredentials.get_creds());
    +
    +        Subject.doAs(subject, new PrivilegedExceptionAction<Object>() {
    +            @Override public Object run() throws Exception {
    +                workerState =
    +                    new WorkerState(conf, context, topologyId, assignmentId, port, workerId, topologyConf, stateStorage,
    +                        stormClusterState);
    +
    +                // Heartbeat here so that worker process dies if this fails
    +                // it's important that worker heartbeat to supervisor ASAP so that supervisor knows
    +                // that worker is running and moves on
    +                doHeartBeat();
    +
    +                executorsAtom = new AtomicReference<>(null);
    +
    +                // launch heartbeat threads immediately so that slow-loading tasks don't cause the worker to timeout
    +                // to the supervisor
    +                workerState.heartbeatTimer
    +                    .scheduleRecurring(0, (Integer) conf.get(Config.WORKER_HEARTBEAT_FREQUENCY_SECS), () -> {
    +                        try {
    +                            doHeartBeat();
    +                        } catch (IOException e) {
    +                            throw new RuntimeException(e);
    +                        }
    +                    });
    +
    +                workerState.executorHeartbeatTimer
    +                    .scheduleRecurring(0, (Integer) conf.get(Config.WORKER_HEARTBEAT_FREQUENCY_SECS),
    +                        Worker.this::doExecutorHeartbeats);
    +
    +                workerState.registerCallbacks();
    +
    +                workerState.refreshConnections(null);
    +
    +                workerState.activateWorkerWhenAllConnectionsReady();
    +
    +                workerState.refreshStormActive(null);
    +
    +                workerState.runWorkerStartHooks();
    +
    +                List<IRunningExecutor> newExecutors = new ArrayList<IRunningExecutor>();
    +                for (List<Long> e : workerState.getExecutors()) {
    +                    if (ConfigUtils.isLocalMode(topologyConf)) {
    +                        newExecutors.add(
    +                            LocalExecutor.mkExecutor(workerState, e, initialCredentials.get_creds())
    +                                .execute());
    +                    } else {
    +                        newExecutors.add(
    +                            Executor.mkExecutor(workerState, e, initialCredentials.get_creds())
    +                                .execute());
    +                    }
    +                }
    +                executorsAtom.set(newExecutors);
    +
    +                EventHandler<Object> tupleHandler = (packets, seqId, batchEnd) -> workerState
    +                    .sendTuplesToRemoteWorker((HashMap<Integer, ArrayList<TaskMessage>>) packets, seqId, batchEnd);
    +
    +                // This thread will publish the messages destined for remote tasks to remote connections
    +                transferThread = Utils.asyncLoop(() -> {
    +                    workerState.transferQueue.consumeBatchWhenAvailable(tupleHandler);
    +                    return 0L;
    +                });
    +
    +                DisruptorBackpressureCallback disruptorBackpressureHandler =
    +                    mkDisruptorBackpressureHandler(workerState);
    +                workerState.transferQueue.registerBackpressureCallback(disruptorBackpressureHandler);
    +                workerState.transferQueue
    +                    .setEnableBackpressure((Boolean) topologyConf.get(Config.TOPOLOGY_BACKPRESSURE_ENABLE));
    +                workerState.transferQueue
    +                    .setHighWaterMark(Utils.getDouble(topologyConf.get(Config.BACKPRESSURE_DISRUPTOR_HIGH_WATERMARK)));
    +                workerState.transferQueue
    +                    .setLowWaterMark(Utils.getDouble(topologyConf.get(Config.BACKPRESSURE_DISRUPTOR_LOW_WATERMARK)));
    +
    +                WorkerBackpressureCallback backpressureCallback = mkBackpressureHandler();
    +                backpressureThread = new WorkerBackpressureThread(workerState.backpressureTrigger, workerState, backpressureCallback);
    +                if ((Boolean) topologyConf.get(Config.TOPOLOGY_BACKPRESSURE_ENABLE)) {
    +                    backpressureThread.start();
    +                    stormClusterState.topologyBackpressure(topologyId, workerState::refreshThrottle);
    +                }
    +
    +                credentialsAtom = new AtomicReference<Credentials>(initialCredentials);
    +
    +                establishLogSettingCallback();
    +
    +                workerState.stormClusterState.credentials(topologyId, Worker.this::checkCredentialsChanged);
    +
    +                workerState.refreshCredentialsTimer.scheduleRecurring(0,
    +                    (Integer) conf.get(Config.TASK_CREDENTIALS_POLL_SECS), new Runnable() {
    +                        @Override public void run() {
    +                            checkCredentialsChanged();
    +                            if ((Boolean) topologyConf.get(Config.TOPOLOGY_BACKPRESSURE_ENABLE)) {
    --- End diff --
    
    Before porting, we have another timer `refresh-backpressure-timer` which runs every `TASK_BACKPRESSURE_POLL_SECS`. 
    
    ```
        (if ((:storm-conf worker) TOPOLOGY-BACKPRESSURE-ENABLE)		
          (.scheduleRecurring (:refresh-backpressure-timer worker) 0 (conf TASK-BACKPRESSURE-POLL-SECS) topology-backpressure-callback))
    ```
    
    IMHO it would be better to keep it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #1756: STORM-1278: Port org.apache.storm.daemon.worker to java

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on the issue:

    https://github.com/apache/storm/pull/1756
  
    @abhishekagarwal87 @revans2 
    I'm even OK that if @revans2 crafts a new PR on top of this, or file an issue regarding my comments and merge first @abhishekagarwal87 doesn't mind.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #1756: STORM-1278: Port org.apache.storm.daemon.worker to java

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on the issue:

    https://github.com/apache/storm/pull/1756
  
    @abhishekagarwal87 I know you are busy. If you don't have time to do the rework for this I would be happy to do it for you.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #1756: STORM-1278: Port org.apache.storm.daemon.worker to java

Posted by abhishekagarwal87 <gi...@git.apache.org>.
Github user abhishekagarwal87 commented on the issue:

    https://github.com/apache/storm/pull/1756
  
    The PR is ready now.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1756: STORM-1278: Port org.apache.storm.daemon.worker to...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/storm/pull/1756


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1756: STORM-1278: Port org.apache.storm.daemon.worker to...

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1756#discussion_r87163440
  
    --- Diff: storm-core/src/jvm/org/apache/storm/daemon/worker/Worker.java ---
    @@ -0,0 +1,426 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + * <p>
    + * http://www.apache.org/licenses/LICENSE-2.0
    + * <p>
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.storm.daemon.worker;
    +
    +import com.google.common.base.Preconditions;
    +import com.lmax.disruptor.EventHandler;
    +import org.apache.commons.io.FileUtils;
    +import org.apache.commons.lang.ObjectUtils;
    +import org.apache.storm.Config;
    +import org.apache.storm.cluster.*;
    --- End diff --
    
    Minor or maybe nitpick: it would be better to expand here and below three.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1756: STORM-1278: Port org.apache.storm.daemon.worker to...

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1756#discussion_r87189395
  
    --- Diff: storm-core/src/jvm/org/apache/storm/daemon/worker/Worker.java ---
    @@ -0,0 +1,426 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + * <p>
    + * http://www.apache.org/licenses/LICENSE-2.0
    + * <p>
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.storm.daemon.worker;
    +
    +import com.google.common.base.Preconditions;
    +import com.lmax.disruptor.EventHandler;
    +import org.apache.commons.io.FileUtils;
    +import org.apache.commons.lang.ObjectUtils;
    +import org.apache.storm.Config;
    +import org.apache.storm.cluster.*;
    +import org.apache.storm.daemon.DaemonCommon;
    +import org.apache.storm.daemon.Shutdownable;
    +import org.apache.storm.daemon.StormCommon;
    +import org.apache.storm.executor.Executor;
    +import org.apache.storm.executor.ExecutorShutdown;
    +import org.apache.storm.executor.IRunningExecutor;
    +import org.apache.storm.executor.LocalExecutor;
    +import org.apache.storm.generated.*;
    +import org.apache.storm.messaging.IConnection;
    +import org.apache.storm.messaging.IContext;
    +import org.apache.storm.messaging.TaskMessage;
    +import org.apache.storm.security.auth.AuthUtils;
    +import org.apache.storm.security.auth.IAutoCredentials;
    +import org.apache.storm.stats.StatsUtil;
    +import org.apache.storm.utils.*;
    +import org.apache.zookeeper.data.ACL;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +import uk.org.lidalia.sysoutslf4j.context.SysOutOverSLF4J;
    +
    +import javax.security.auth.Subject;
    +import java.io.File;
    +import java.io.IOException;
    +import java.nio.charset.Charset;
    +import java.security.PrivilegedExceptionAction;
    +import java.util.*;
    +import java.util.concurrent.atomic.AtomicReference;
    +import java.util.function.Function;
    +import java.util.stream.Collectors;
    +
    +public class Worker implements Shutdownable, DaemonCommon {
    +
    +    private static final Logger LOG = LoggerFactory.getLogger(Worker.class);
    +    private final Map conf;
    +    private final IContext context;
    +    private final String topologyId;
    +    private final String assignmentId;
    +    private final int port;
    +    private final String workerId;
    +    private final LogConfigManager logConfigManager;
    +
    +
    +    private WorkerState workerState;
    +    private AtomicReference<List<IRunningExecutor>> executorsAtom;
    +    private Thread transferThread;
    +    private WorkerBackpressureThread backpressureThread;
    +
    +    private AtomicReference<Credentials> credentialsAtom;
    +    private Subject subject;
    +    private Collection<IAutoCredentials> autoCreds;
    +
    +
    +    /**
    +     * TODO: should worker even take the topologyId as input? this should be
    +     * deducable from cluster state (by searching through assignments)
    +     * what about if there's inconsistency in assignments? -> but nimbus should guarantee this consistency
    +     *
    +     * @param conf         - Storm configuration
    +     * @param context      -
    +     * @param topologyId   - topology id
    +     * @param assignmentId - assignement id
    +     * @param port         - port on which the worker runs
    +     * @param workerId     - worker id
    +     */
    +
    +    public Worker(Map conf, IContext context, String topologyId, String assignmentId, int port, String workerId) {
    +        this.conf = conf;
    +        this.context = context;
    +        this.topologyId = topologyId;
    +        this.assignmentId = assignmentId;
    +        this.port = port;
    +        this.workerId = workerId;
    +        this.logConfigManager = new LogConfigManager();
    +    }
    +
    +    public void start() throws Exception {
    +        LOG.info("Launching worker for {} on {}:{} with id {} and conf {}", topologyId, assignmentId, port, workerId,
    +            conf);
    +        // because in local mode, its not a separate
    +        // process. supervisor will register it in this case
    +        // if ConfigUtils.isLocalMode(conf) returns false then it is in distributed mode.
    +        if (!ConfigUtils.isLocalMode(conf)) {
    +            // Distributed mode
    +            SysOutOverSLF4J.sendSystemOutAndErrToSLF4J();
    +            String pid = Utils.processPid();
    +            FileUtils.touch(new File(ConfigUtils.workerPidPath(conf, workerId, pid)));
    +            FileUtils.writeStringToFile(new File(ConfigUtils.workerArtifactsPidPath(conf, topologyId, port)), pid,
    +                Charset.forName("UTF-8"));
    +        }
    +        final Map topologyConf =
    +            ConfigUtils.overrideLoginConfigWithSystemProperty(ConfigUtils.readSupervisorStormConf(conf, topologyId));
    +        List<ACL> acls = Utils.getWorkerACL(topologyConf);
    +        IStateStorage stateStorage =
    +            ClusterUtils.mkStateStorage(conf, topologyConf, acls, new ClusterStateContext(DaemonType.WORKER));
    +        IStormClusterState stormClusterState =
    +            ClusterUtils.mkStormClusterState(stateStorage, acls, new ClusterStateContext());
    +        Credentials initialCredentials = stormClusterState.credentials(topologyId, null);
    +        autoCreds = AuthUtils.GetAutoCredentials(topologyConf);
    +        subject = AuthUtils.populateSubject(null, autoCreds, initialCredentials.get_creds());
    +
    +        Subject.doAs(subject, new PrivilegedExceptionAction<Object>() {
    +            @Override public Object run() throws Exception {
    +                workerState =
    +                    new WorkerState(conf, context, topologyId, assignmentId, port, workerId, topologyConf, stateStorage,
    +                        stormClusterState);
    +
    +                // Heartbeat here so that worker process dies if this fails
    +                // it's important that worker heartbeat to supervisor ASAP so that supervisor knows
    +                // that worker is running and moves on
    +                doHeartBeat();
    +
    +                executorsAtom = new AtomicReference<>(null);
    +
    +                // launch heartbeat threads immediately so that slow-loading tasks don't cause the worker to timeout
    +                // to the supervisor
    +                workerState.heartbeatTimer
    +                    .scheduleRecurring(0, (Integer) conf.get(Config.WORKER_HEARTBEAT_FREQUENCY_SECS), () -> {
    +                        try {
    +                            doHeartBeat();
    +                        } catch (IOException e) {
    +                            throw new RuntimeException(e);
    +                        }
    +                    });
    +
    +                workerState.executorHeartbeatTimer
    +                    .scheduleRecurring(0, (Integer) conf.get(Config.WORKER_HEARTBEAT_FREQUENCY_SECS),
    +                        Worker.this::doExecutorHeartbeats);
    +
    +                workerState.registerCallbacks();
    +
    +                workerState.refreshConnections(null);
    +
    +                workerState.activateWorkerWhenAllConnectionsReady();
    +
    +                workerState.refreshStormActive(null);
    +
    +                workerState.runWorkerStartHooks();
    +
    +                List<IRunningExecutor> newExecutors = new ArrayList<IRunningExecutor>();
    +                for (List<Long> e : workerState.getExecutors()) {
    +                    if (ConfigUtils.isLocalMode(topologyConf)) {
    +                        newExecutors.add(
    +                            LocalExecutor.mkExecutor(workerState, e, initialCredentials.get_creds())
    +                                .execute());
    +                    } else {
    +                        newExecutors.add(
    +                            Executor.mkExecutor(workerState, e, initialCredentials.get_creds())
    +                                .execute());
    +                    }
    +                }
    +                executorsAtom.set(newExecutors);
    +
    +                EventHandler<Object> tupleHandler = (packets, seqId, batchEnd) -> workerState
    +                    .sendTuplesToRemoteWorker((HashMap<Integer, ArrayList<TaskMessage>>) packets, seqId, batchEnd);
    +
    +                // This thread will publish the messages destined for remote tasks to remote connections
    +                transferThread = Utils.asyncLoop(() -> {
    +                    workerState.transferQueue.consumeBatchWhenAvailable(tupleHandler);
    +                    return 0L;
    +                });
    +
    +                DisruptorBackpressureCallback disruptorBackpressureHandler =
    +                    mkDisruptorBackpressureHandler(workerState);
    +                workerState.transferQueue.registerBackpressureCallback(disruptorBackpressureHandler);
    +                workerState.transferQueue
    +                    .setEnableBackpressure((Boolean) topologyConf.get(Config.TOPOLOGY_BACKPRESSURE_ENABLE));
    +                workerState.transferQueue
    +                    .setHighWaterMark(Utils.getDouble(topologyConf.get(Config.BACKPRESSURE_DISRUPTOR_HIGH_WATERMARK)));
    +                workerState.transferQueue
    +                    .setLowWaterMark(Utils.getDouble(topologyConf.get(Config.BACKPRESSURE_DISRUPTOR_LOW_WATERMARK)));
    +
    +                WorkerBackpressureCallback backpressureCallback = mkBackpressureHandler();
    +                backpressureThread = new WorkerBackpressureThread(workerState.backpressureTrigger, workerState, backpressureCallback);
    +                if ((Boolean) topologyConf.get(Config.TOPOLOGY_BACKPRESSURE_ENABLE)) {
    +                    backpressureThread.start();
    +                    stormClusterState.topologyBackpressure(topologyId, workerState::refreshThrottle);
    +                }
    +
    +                credentialsAtom = new AtomicReference<Credentials>(initialCredentials);
    +
    +                establishLogSettingCallback();
    +
    +                workerState.stormClusterState.credentials(topologyId, Worker.this::checkCredentialsChanged);
    +
    +                workerState.refreshCredentialsTimer.scheduleRecurring(0,
    +                    (Integer) conf.get(Config.TASK_CREDENTIALS_POLL_SECS), new Runnable() {
    +                        @Override public void run() {
    +                            checkCredentialsChanged();
    +                            if ((Boolean) topologyConf.get(Config.TOPOLOGY_BACKPRESSURE_ENABLE)) {
    +                               checkThrottleChanged();
    +                            }
    +                        }
    +                    });
    +
    +                // The jitter allows the clients to get the data at different times, and avoids thundering herd
    +                if (!(Boolean) topologyConf.get(Config.TOPOLOGY_DISABLE_LOADAWARE_MESSAGING)) {
    +                    workerState.refreshLoadTimer.scheduleRecurringWithJitter(0, 1, 500, workerState::refreshLoad);
    +                }
    +
    +                workerState.refreshConnectionsTimer.scheduleRecurring(0,
    +                    (Integer) conf.get(Config.TASK_REFRESH_POLL_SECS), workerState::refreshConnections);
    +
    +                workerState.resetLogTevelsTimer.scheduleRecurring(0,
    +                    (Integer) conf.get(Config.WORKER_LOG_LEVEL_RESET_POLL_SECS), logConfigManager::resetLogLevels);
    +
    +                workerState.refreshActiveTimer.scheduleRecurring(0, (Integer) conf.get(Config.TASK_REFRESH_POLL_SECS),
    +                    workerState::refreshStormActive);
    +
    +                LOG.info("Worker has topology config {}", Utils.redactValue(topologyConf, Config.STORM_ZOOKEEPER_TOPOLOGY_AUTH_PAYLOAD));
    +                LOG.info("Worker {} for storm {} on {}:{}  has finished loading", workerId, topologyId, assignmentId, port);
    +                return this;
    +            };
    +        });
    +
    +    }
    +
    +    public void doHeartBeat() throws IOException {
    +        LocalState state = ConfigUtils.workerState(workerState.conf, workerState.workerId);
    +        state.setWorkerHeartBeat(new LSWorkerHeartbeat(Time.currentTimeSecs(), workerState.topologyId,
    +            workerState.executors.stream()
    +                .map(executor -> new ExecutorInfo(executor.get(0).intValue(), executor.get(1).intValue()))
    +                .collect(Collectors.toList()), workerState.port));
    +        state.cleanup(60); // this is just in case supervisor is down so that disk doesn't fill up.
    +        // it shouldn't take supervisor 120 seconds between listing dir and reading it
    +    }
    +
    +    public void doExecutorHeartbeats() {
    +        Map<List<Integer>, ExecutorStats> stats;
    +        List<IRunningExecutor> executors = this.executorsAtom.get();
    +        if (null == executors) {
    +            stats = StatsUtil.mkEmptyExecutorZkHbs(workerState.executors);
    +        } else {
    +            stats = StatsUtil.convertExecutorZkHbs(executors.stream().collect(Collectors
    +                .toMap((Function<IRunningExecutor, List<Long>>) IRunningExecutor::getExecutorId,
    +                    (Function<IRunningExecutor, ExecutorStats>) IRunningExecutor::renderStats)));
    +        }
    +        Map<String, Object> zkHB = StatsUtil.mkZkWorkerHb(workerState.topologyId, stats, workerState.uptime.upTime());
    +        try {
    +            workerState.stormClusterState
    +                .workerHeartbeat(workerState.topologyId, workerState.assignmentId, (long) workerState.port,
    +                    StatsUtil.thriftifyZkWorkerHb(zkHB));
    +        } catch (Exception ex) {
    +            LOG.error("Worker failed to write heartbeats to ZK or Pacemaker...will retry", ex);
    +        }
    +    }
    +
    +    public void checkCredentialsChanged() {
    +        Credentials newCreds = workerState.stormClusterState.credentials(topologyId, null);
    +        if (! ObjectUtils.equals(newCreds, credentialsAtom.get())) {
    +            // This does not have to be atomic, worst case we update when one is not needed
    +            AuthUtils.updateSubject(subject, autoCreds, (null == newCreds) ? null : newCreds.get_creds());
    +            for (IRunningExecutor executor : executorsAtom.get()) {
    +                executor.credenetialsChanged(newCreds);
    --- End diff --
    
    Nit: credenetialsChanged -> credentialsChanged
    Typo seems to be already existed, but another chance of fix has come.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1756: STORM-1278: Port org.apache.storm.daemon.worker to...

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1756#discussion_r86344854
  
    --- Diff: storm-core/src/clj/org/apache/storm/testing.clj ---
    @@ -667,23 +665,18 @@
               (.put "spout-emitted" (AtomicInteger. 0))
               (.put "transferred" (AtomicInteger. 0))
               (.put "processed" (AtomicInteger. 0))))
    +      (LocalExecutor/setTrackId id#)
           (with-var-roots
    --- End diff --
    
    We don't need with-var-roots any more, we could just call with simulated-time-local-cluster without it


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1756: STORM-1278: Port org.apache.storm.daemon.worker to...

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1756#discussion_r86233431
  
    --- Diff: storm-core/src/clj/org/apache/storm/daemon/local_executor.clj ---
    @@ -25,18 +25,3 @@
         (let [val (AddressedTuple. task tuple)]
    --- End diff --
    
    This entire file should really be removed and with-tracked-cluster should be updated to not use it.  It is obviously not needed because this function is never called, only over ridden.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1756: STORM-1278: Port org.apache.storm.daemon.worker to...

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1756#discussion_r86168341
  
    --- Diff: storm-core/src/jvm/org/apache/storm/daemon/worker/Worker.java ---
    @@ -0,0 +1,426 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + * <p>
    + * http://www.apache.org/licenses/LICENSE-2.0
    + * <p>
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.storm.daemon.worker;
    +
    +import com.google.common.base.Preconditions;
    +import com.lmax.disruptor.EventHandler;
    +import org.apache.commons.io.FileUtils;
    +import org.apache.commons.lang.ObjectUtils;
    +import org.apache.storm.Config;
    +import org.apache.storm.cluster.*;
    +import org.apache.storm.daemon.DaemonCommon;
    +import org.apache.storm.daemon.Shutdownable;
    +import org.apache.storm.daemon.StormCommon;
    +import org.apache.storm.executor.Executor;
    +import org.apache.storm.executor.ExecutorShutdown;
    +import org.apache.storm.executor.IRunningExecutor;
    +import org.apache.storm.executor.LocalExecutor;
    +import org.apache.storm.generated.*;
    +import org.apache.storm.messaging.IConnection;
    +import org.apache.storm.messaging.IContext;
    +import org.apache.storm.messaging.TaskMessage;
    +import org.apache.storm.security.auth.AuthUtils;
    +import org.apache.storm.security.auth.IAutoCredentials;
    +import org.apache.storm.stats.StatsUtil;
    +import org.apache.storm.utils.*;
    +import org.apache.zookeeper.data.ACL;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +import uk.org.lidalia.sysoutslf4j.context.SysOutOverSLF4J;
    +
    +import javax.security.auth.Subject;
    +import java.io.File;
    +import java.io.IOException;
    +import java.nio.charset.Charset;
    +import java.security.PrivilegedExceptionAction;
    +import java.util.*;
    +import java.util.concurrent.atomic.AtomicReference;
    +import java.util.function.Function;
    +import java.util.stream.Collectors;
    +
    +public class Worker implements Shutdownable, DaemonCommon {
    +
    +    private static final Logger LOG = LoggerFactory.getLogger(Worker.class);
    +    private final Map conf;
    +    private final IContext context;
    +    private final String topologyId;
    +    private final String assignmentId;
    +    private final int port;
    +    private final String workerId;
    +    private final LogConfigManager logConfigManager;
    +
    +
    +    private WorkerState workerState;
    +    private AtomicReference<List<IRunningExecutor>> executorsAtom;
    +    private Thread transferThread;
    +    private WorkerBackpressureThread backpressureThread;
    +
    +    private AtomicReference<Credentials> credentialsAtom;
    +    private Subject subject;
    +    private Collection<IAutoCredentials> autoCreds;
    +
    +
    +    /**
    +     * TODO: should worker even take the topologyId as input? this should be
    +     * deducable from cluster state (by searching through assignments)
    +     * what about if there's inconsistency in assignments? -> but nimbus should guarantee this consistency
    +     *
    +     * @param conf         - Storm configuration
    +     * @param context      -
    +     * @param topologyId   - topology id
    +     * @param assignmentId - assignement id
    +     * @param port         - port on which the worker runs
    +     * @param workerId     - worker id
    +     */
    +
    +    public Worker(Map conf, IContext context, String topologyId, String assignmentId, int port, String workerId) {
    +        this.conf = conf;
    +        this.context = context;
    +        this.topologyId = topologyId;
    +        this.assignmentId = assignmentId;
    +        this.port = port;
    +        this.workerId = workerId;
    +        this.logConfigManager = new LogConfigManager();
    +    }
    +
    +    public void start() throws Exception {
    +        LOG.info("Launching worker for {} on {}:{} with id {} and conf {}", topologyId, assignmentId, port, workerId,
    +            conf);
    +        // because in local mode, its not a separate
    +        // process. supervisor will register it in this case
    +        // if ConfigUtils.isLocalMode(conf) returns false then it is in distributed mode.
    +        if (!ConfigUtils.isLocalMode(conf)) {
    +            // Distributed mode
    +            SysOutOverSLF4J.sendSystemOutAndErrToSLF4J();
    +            String pid = Utils.processPid();
    +            FileUtils.touch(new File(ConfigUtils.workerPidPath(conf, workerId, pid)));
    +            FileUtils.writeStringToFile(new File(ConfigUtils.workerArtifactsPidPath(conf, topologyId, port)), pid,
    +                Charset.defaultCharset());
    +        }
    +        final Map topologyConf =
    +            ConfigUtils.overrideLoginConfigWithSystemProperty(ConfigUtils.readSupervisorStormConf(conf, topologyId));
    +        List<ACL> acls = Utils.getWorkerACL(topologyConf);
    +        IStateStorage stateStorage =
    +            ClusterUtils.mkStateStorage(conf, topologyConf, acls, new ClusterStateContext(DaemonType.WORKER));
    +        IStormClusterState stormClusterState =
    +            ClusterUtils.mkStormClusterState(stateStorage, acls, new ClusterStateContext());
    +        Credentials initialCredentials = stormClusterState.credentials(topologyId, null);
    +        autoCreds = AuthUtils.GetAutoCredentials(topologyConf);
    +        subject = AuthUtils.populateSubject(null, autoCreds, initialCredentials.get_creds());
    +
    +        Subject.doAs(subject, new PrivilegedExceptionAction<Object>() {
    +            @Override public Object run() throws Exception {
    +                workerState =
    +                    new WorkerState(conf, context, topologyId, assignmentId, port, workerId, topologyConf, stateStorage,
    +                        stormClusterState);
    +
    +                // Heartbeat here so that worker process dies if this fails
    +                // it's important that worker heartbeat to supervisor ASAP so that supervisor knows
    +                // that worker is running and moves on
    +                doHeartBeat();
    +
    +                executorsAtom = new AtomicReference<>(null);
    +
    +                // launch heartbeat threads immediately so that slow-loading tasks don't cause the worker to timeout
    +                // to the supervisor
    +                workerState.heartbeatTimer
    +                    .scheduleRecurring(0, (Integer) conf.get(Config.WORKER_HEARTBEAT_FREQUENCY_SECS), () -> {
    +                        try {
    +                            doHeartBeat();
    +                        } catch (IOException e) {
    +                            throw new RuntimeException(e);
    +                        }
    +                    });
    +
    +                workerState.executorHeartbeatTimer
    +                    .scheduleRecurring(0, (Integer) conf.get(Config.WORKER_HEARTBEAT_FREQUENCY_SECS),
    +                        Worker.this::doExecutorHeartbeats);
    +
    +                workerState.registerCallbacks();
    +
    +                workerState.refreshConnections(null);
    +
    +                workerState.activateWorkerWhenAllConnectionsReady();
    +
    +                workerState.refreshStormActive(null);
    +
    +                workerState.runWorkerStartHooks();
    +
    +                List<IRunningExecutor> newExecutors = new ArrayList<IRunningExecutor>();
    +                for (List<Long> e : workerState.getExecutors()) {
    +                    if (ConfigUtils.isLocalMode(topologyConf)) {
    +                        newExecutors.add(
    +                            LocalExecutor.mkExecutor(workerState, e, initialCredentials.get_creds())
    +                                .execute());
    +                    } else {
    +                        newExecutors.add(
    +                            Executor.mkExecutor(workerState, e, initialCredentials.get_creds())
    +                                .execute());
    +                    }
    +                }
    +                executorsAtom.set(newExecutors);
    +
    +                EventHandler<Object> tupleHandler = (packets, seqId, batchEnd) -> workerState
    +                    .sendTuplesToRemoteWorker((HashMap<Integer, ArrayList<TaskMessage>>) packets, seqId, batchEnd);
    +
    +                // This thread will publish the messages destined for remote tasks to remote connections
    +                transferThread = Utils.asyncLoop(() -> {
    +                    workerState.transferQueue.consumeBatchWhenAvailable(tupleHandler);
    +                    return 0L;
    +                });
    +
    +                DisruptorBackpressureCallback disruptorBackpressureHandler =
    +                    mkDisruptorBackpressureHandler(workerState);
    +                workerState.transferQueue.registerBackpressureCallback(disruptorBackpressureHandler);
    +                workerState.transferQueue
    +                    .setEnableBackpressure((Boolean) topologyConf.get(Config.TOPOLOGY_BACKPRESSURE_ENABLE));
    +                workerState.transferQueue
    +                    .setHighWaterMark((Double) topologyConf.get(Config.BACKPRESSURE_DISRUPTOR_HIGH_WATERMARK));
    +                workerState.transferQueue
    +                    .setLowWaterMark((Double) topologyConf.get(Config.BACKPRESSURE_DISRUPTOR_LOW_WATERMARK));
    --- End diff --
    
    These values are not guaranteed to be Doubles.  They are guaranteed to be Number.  Please use Utils.getDouble instead.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1756: STORM-1278: Port org.apache.storm.daemon.worker to...

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1756#discussion_r87336388
  
    --- Diff: storm-core/src/jvm/org/apache/storm/daemon/worker/WorkerState.java ---
    @@ -0,0 +1,690 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + * <p>
    + * http://www.apache.org/licenses/LICENSE-2.0
    + * <p>
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.storm.daemon.worker;
    +
    +import com.google.common.collect.ImmutableMap;
    +import com.google.common.collect.Sets;
    +import org.apache.storm.Config;
    +import org.apache.storm.Constants;
    +import org.apache.storm.StormTimer;
    +import org.apache.storm.cluster.IStateStorage;
    +import org.apache.storm.cluster.IStormClusterState;
    +import org.apache.storm.cluster.VersionedData;
    +import org.apache.storm.daemon.StormCommon;
    +import org.apache.storm.daemon.supervisor.AdvancedFSOps;
    +import org.apache.storm.generated.Assignment;
    +import org.apache.storm.generated.DebugOptions;
    +import org.apache.storm.generated.Grouping;
    +import org.apache.storm.generated.InvalidTopologyException;
    +import org.apache.storm.generated.NodeInfo;
    +import org.apache.storm.generated.StormBase;
    +import org.apache.storm.generated.StormBase;
    +import org.apache.storm.generated.StormTopology;
    +import org.apache.storm.generated.StreamInfo;
    +import org.apache.storm.generated.TopologyStatus;
    +import org.apache.storm.grouping.Load;
    +import org.apache.storm.grouping.LoadMapping;
    +import org.apache.storm.hooks.BaseWorkerHook;
    +import org.apache.storm.messaging.ConnectionWithStatus;
    +import org.apache.storm.messaging.DeserializingConnectionCallback;
    +import org.apache.storm.messaging.IConnection;
    +import org.apache.storm.messaging.IContext;
    +import org.apache.storm.messaging.TaskMessage;
    +import org.apache.storm.messaging.TransportFactory;
    +import org.apache.storm.serialization.KryoTupleSerializer;
    +import org.apache.storm.task.WorkerTopologyContext;
    +import org.apache.storm.tuple.AddressedTuple;
    +import org.apache.storm.tuple.Fields;
    +import org.apache.storm.utils.ConfigUtils;
    +import org.apache.storm.utils.DisruptorQueue;
    +import org.apache.storm.utils.ThriftTopologyUtils;
    +import org.apache.storm.utils.TransferDrainer;
    +import org.apache.storm.utils.Utils;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.io.IOException;
    +import java.nio.ByteBuffer;
    +import java.util.*;
    +import java.util.concurrent.ExecutorService;
    +import java.util.concurrent.Executors;
    +import java.util.concurrent.atomic.AtomicBoolean;
    +import java.util.concurrent.atomic.AtomicLong;
    +import java.util.concurrent.atomic.AtomicReference;
    +import java.util.concurrent.locks.ReentrantReadWriteLock;
    +import java.util.function.Function;
    +import java.util.function.UnaryOperator;
    +import java.util.stream.Collectors;
    +
    +public class WorkerState {
    +
    +    private static final Logger LOG = LoggerFactory.getLogger(WorkerState.class);
    +
    +    final Map conf;
    +    final IContext mqContext;
    +
    +    public Map getConf() {
    +        return conf;
    +    }
    +
    +    public IConnection getReceiver() {
    +        return receiver;
    +    }
    +
    +    public String getTopologyId() {
    +        return topologyId;
    +    }
    +
    +    public int getPort() {
    +        return port;
    +    }
    +
    +    public String getWorkerId() {
    +        return workerId;
    +    }
    +
    +    public IStateStorage getStateStorage() {
    +        return stateStorage;
    +    }
    +
    +    public AtomicBoolean getIsTopologyActive() {
    +        return isTopologyActive;
    +    }
    +
    +    public AtomicReference<Map<String, DebugOptions>> getStormComponentToDebug() {
    +        return stormComponentToDebug;
    +    }
    +
    +    public Set<List<Long>> getExecutors() {
    +        return executors;
    +    }
    +
    +    public List<Integer> getTaskIds() {
    +        return taskIds;
    +    }
    +
    +    public Map getTopologyConf() {
    +        return topologyConf;
    +    }
    +
    +    public StormTopology getTopology() {
    +        return topology;
    +    }
    +
    +    public StormTopology getSystemTopology() {
    +        return systemTopology;
    +    }
    +
    +    public Map<Integer, String> getTaskToComponent() {
    +        return taskToComponent;
    +    }
    +
    +    public Map<String, Map<String, Fields>> getComponentToStreamToFields() {
    +        return componentToStreamToFields;
    +    }
    +
    +    public Map<String, List<Integer>> getComponentToSortedTasks() {
    +        return componentToSortedTasks;
    +    }
    +
    +    public AtomicReference<Map<NodeInfo, IConnection>> getCachedNodeToPortSocket() {
    +        return cachedNodeToPortSocket;
    +    }
    +
    +    public Map<List<Long>, DisruptorQueue> getExecutorReceiveQueueMap() {
    +        return executorReceiveQueueMap;
    +    }
    +
    +    public Runnable getSuicideCallback() {
    +        return suicideCallback;
    +    }
    +
    +    public Utils.UptimeComputer getUptime() {
    +        return uptime;
    +    }
    +
    +    public Map<String, Object> getDefaultSharedResources() {
    +        return defaultSharedResources;
    +    }
    +
    +    public Map<String, Object> getUserSharedResources() {
    +        return userSharedResources;
    +    }
    +
    +    final IConnection receiver;
    +    final String topologyId;
    +    final String assignmentId;
    +    final int port;
    +    final String workerId;
    +    final IStateStorage stateStorage;
    +    final IStormClusterState stormClusterState;
    +
    +    // when worker bootup, worker will start to setup initial connections to
    +    // other workers. When all connection is ready, we will enable this flag
    +    // and spout and bolt will be activated.
    +    // used in worker only, keep it as atomic
    +    final AtomicBoolean isWorkerActive;
    +    final AtomicBoolean isTopologyActive;
    +    final AtomicReference<Map<String, DebugOptions>> stormComponentToDebug;
    +
    +    // executors and taskIds running in this worker
    +    final Set<List<Long>> executors;
    +    final List<Integer> taskIds;
    +    final Map topologyConf;
    +    final StormTopology topology;
    +    final StormTopology systemTopology;
    +    final Map<Integer, String> taskToComponent;
    +    final Map<String, Map<String, Fields>> componentToStreamToFields;
    +    final Map<String, List<Integer>> componentToSortedTasks;
    +    final ReentrantReadWriteLock endpointSocketLock;
    +    final AtomicReference<Map<Integer, NodeInfo>> cachedTaskToNodePort;
    +    final AtomicReference<Map<NodeInfo, IConnection>> cachedNodeToPortSocket;
    +    final Map<List<Long>, DisruptorQueue> executorReceiveQueueMap;
    +    // executor id is in form [start_task_id end_task_id]
    +    // short executor id is start_task_id
    +    final Map<Integer, DisruptorQueue> shortExecutorReceiveQueueMap;
    +    final Map<Integer, Integer> taskToShortExecutor;
    +    final Runnable suicideCallback;
    +    final Utils.UptimeComputer uptime;
    +    final Map<String, Object> defaultSharedResources;
    +    final Map<String, Object> userSharedResources;
    +    final LoadMapping loadMapping;
    +    final AtomicReference<Map<String, VersionedData<Assignment>>> assignmentVersions;
    +    // Whether this worker is going slow
    +    final AtomicBoolean backpressure = new AtomicBoolean(false);
    +    // If the transfer queue is backed-up
    +    final AtomicBoolean transferBackpressure = new AtomicBoolean(false);
    +    // a trigger for synchronization with executors
    +    final AtomicBoolean backpressureTrigger = new AtomicBoolean(false);
    +    // whether the throttle is activated for spouts
    +    final AtomicBoolean throttleOn = new AtomicBoolean(false);
    +
    +    public LoadMapping getLoadMapping() {
    +        return loadMapping;
    +    }
    +
    +    public AtomicReference<Map<String, VersionedData<Assignment>>> getAssignmentVersions() {
    +        return assignmentVersions;
    +    }
    +
    +    public AtomicBoolean getBackpressureTrigger() {
    +        return backpressureTrigger;
    +    }
    +
    +    public AtomicBoolean getThrottleOn() {
    +        return throttleOn;
    +    }
    +
    +    public DisruptorQueue getTransferQueue() {
    +        return transferQueue;
    +    }
    +
    +    public StormTimer getUserTimer() {
    +        return userTimer;
    +    }
    +
    +    final DisruptorQueue transferQueue;
    +
    +    // Timers
    +    final StormTimer heartbeatTimer = mkHaltingTimer("heartbeat-timer");
    +    final StormTimer refreshLoadTimer = mkHaltingTimer("refresh-load-timer");
    +    final StormTimer refreshConnectionsTimer = mkHaltingTimer("refresh-connections-timer");
    +    final StormTimer refreshCredentialsTimer = mkHaltingTimer("refresh-credentials-timer");
    +    final StormTimer resetLogTevelsTimer = mkHaltingTimer("reset-log-levels-timer");
    +    final StormTimer refreshActiveTimer = mkHaltingTimer("refresh-active-timer");
    +    final StormTimer executorHeartbeatTimer = mkHaltingTimer("executor-heartbeat-timer");
    +    final StormTimer userTimer = mkHaltingTimer("user-timer");
    +
    +    // global variables only used internally in class
    +    private final List<Integer> outboundTasks;
    +    private final AtomicLong nextUpdate = new AtomicLong(0);
    +    private final boolean trySerializeLocal;
    +    private final TransferDrainer drainer;
    +
    +    private static final long LOAD_REFRESH_INTERVAL_MS = 5000L;
    +
    +    public WorkerState(Map conf, IContext mqContext, String topologyId, String assignmentId, int port, String workerId,
    +        Map topologyConf, IStateStorage stateStorage, IStormClusterState stormClusterState)
    +        throws IOException, InvalidTopologyException {
    +        this.executors = new HashSet<>(readWorkerExecutors(stormClusterState, topologyId, assignmentId, port));
    +        this.transferQueue = new DisruptorQueue("worker-transfer-queue",
    +            Utils.getInt(topologyConf.get(Config.TOPOLOGY_TRANSFER_BUFFER_SIZE)),
    +            (long) topologyConf.get(Config.TOPOLOGY_DISRUPTOR_WAIT_TIMEOUT_MILLIS),
    +            Utils.getInt(topologyConf.get(Config.TOPOLOGY_DISRUPTOR_BATCH_SIZE)),
    +            (long) topologyConf.get(Config.TOPOLOGY_DISRUPTOR_BATCH_TIMEOUT_MILLIS));
    +
    +        this.conf = conf;
    +        this.mqContext = (null != mqContext) ? mqContext : TransportFactory.makeContext(topologyConf);
    +        this.receiver = this.mqContext.bind(topologyId, port);
    +        this.topologyId = topologyId;
    +        this.assignmentId = assignmentId;
    +        this.port = port;
    +        this.workerId = workerId;
    +        this.stateStorage = stateStorage;
    +        this.stormClusterState = stormClusterState;
    +        this.isWorkerActive = new AtomicBoolean(false);
    +        this.isTopologyActive = new AtomicBoolean(false);
    +        this.stormComponentToDebug = new AtomicReference<>();
    +        this.executorReceiveQueueMap = mkReceiveQueueMap(topologyConf, executors);
    +        this.shortExecutorReceiveQueueMap = new HashMap<>();
    +        this.taskIds = new ArrayList<>();
    +        for (Map.Entry<List<Long>, DisruptorQueue> entry : executorReceiveQueueMap.entrySet()) {
    +            this.shortExecutorReceiveQueueMap.put(entry.getKey().get(0).intValue(), entry.getValue());
    +            this.taskIds.addAll(StormCommon.executorIdToTasks(entry.getKey()));
    +        }
    +        Collections.sort(taskIds);
    +        this.topologyConf = topologyConf;
    +        this.topology = ConfigUtils.readSupervisorTopology(conf, topologyId, AdvancedFSOps.make(conf));
    +        this.systemTopology = StormCommon.systemTopology(topologyConf, topology);
    +        this.taskToComponent = StormCommon.stormTaskInfo(topology, topologyConf);
    +        this.componentToStreamToFields = new HashMap<>();
    +        for (String c : ThriftTopologyUtils.getComponentIds(systemTopology)) {
    +            Map<String, Fields> streamToFields = new HashMap<>();
    +            for (Map.Entry<String, StreamInfo> stream : ThriftTopologyUtils.getComponentCommon(systemTopology, c).get_streams().entrySet()) {
    +                streamToFields.put(stream.getKey(), new Fields(stream.getValue().get_output_fields()));
    +            }
    +            componentToStreamToFields.put(c, streamToFields);
    +        }
    +        this.componentToSortedTasks = Utils.reverseMap(taskToComponent);
    +        this.componentToSortedTasks.values().forEach(Collections::sort);
    +        this.endpointSocketLock = new ReentrantReadWriteLock();
    +        this.cachedNodeToPortSocket = new AtomicReference<>(new HashMap<>());
    +        this.cachedTaskToNodePort = new AtomicReference<>(new HashMap<>());
    +        this.taskToShortExecutor = new HashMap<>();
    +        for (List<Long> executor : this.executors) {
    +            for (Integer task : StormCommon.executorIdToTasks(executor)) {
    +                taskToShortExecutor.put(task, executor.get(0).intValue());
    +            }
    +        }
    +        this.suicideCallback = Utils.mkSuicideFn();
    +        this.uptime = Utils.makeUptimeComputer();
    +        this.defaultSharedResources = makeDefaultResources();
    +        this.userSharedResources = makeUserResources();
    +        this.loadMapping = new LoadMapping();
    +        this.assignmentVersions = new AtomicReference<>(new HashMap<>());
    +        this.outboundTasks = workerOutboundTasks();
    +        this.trySerializeLocal = topologyConf.containsKey(Config.TOPOLOGY_TESTING_ALWAYS_TRY_SERIALIZE)
    +            && (Boolean) topologyConf.get(Config.TOPOLOGY_TESTING_ALWAYS_TRY_SERIALIZE);
    +        if (trySerializeLocal) {
    +            LOG.warn("WILL TRY TO SERIALIZE ALL TUPLES (Turn off {} for production", Config.TOPOLOGY_TESTING_ALWAYS_TRY_SERIALIZE);
    +        }
    +        this.drainer = new TransferDrainer();
    +    }
    +
    +    public void refreshConnections() {
    +        try {
    +            refreshConnections(() -> refreshConnectionsTimer.schedule(0, this::refreshConnections));
    +        } catch (Exception e) {
    +            throw Utils.wrapInRuntime(e);
    +        }
    +    }
    +
    +    public void refreshConnections(Runnable callback) throws Exception {
    +        Integer version = stormClusterState.assignmentVersion(topologyId, callback);
    +        version = (null == version) ? 0 : version;
    +        VersionedData<Assignment> assignmentVersion = assignmentVersions.get().get(topologyId);
    +        Assignment assignment;
    +        if (null != assignmentVersion && (assignmentVersion.getVersion() == version)) {
    +            assignment = assignmentVersion.getData();
    +        } else {
    +            VersionedData<Assignment>
    +                newAssignmentVersion = new VersionedData<>(version,
    +                stormClusterState.assignmentInfoWithVersion(topologyId, callback).getData());
    +            assignmentVersions.getAndUpdate(prev -> {
    +                Map<String, VersionedData<Assignment>> next = new HashMap<>(prev);
    +                next.put(topologyId, newAssignmentVersion);
    +                return next;
    +            });
    +            assignment = newAssignmentVersion.getData();
    +        }
    +
    +        Set<NodeInfo> neededConnections = new HashSet<>();
    +        Map<Integer, NodeInfo> newTaskToNodePort = new HashMap<>();
    +        if (null != assignment) {
    +            Map<Integer, NodeInfo> taskToNodePort = StormCommon.taskToNodeport(assignment.get_executor_node_port());
    +            for (Map.Entry<Integer, NodeInfo> taskToNodePortEntry : taskToNodePort.entrySet()) {
    +                Integer task = taskToNodePortEntry.getKey();
    +                if (outboundTasks.contains(task)) {
    +                    newTaskToNodePort.put(task, taskToNodePortEntry.getValue());
    +                    if (!taskIds.contains(task)) {
    +                        neededConnections.add(taskToNodePortEntry.getValue());
    +                    }
    +                }
    +            }
    +        }
    +
    +        Set<NodeInfo> currentConnections = cachedNodeToPortSocket.get().keySet();
    +        Set<NodeInfo> newConnections = Sets.difference(neededConnections, currentConnections);
    +        Set<NodeInfo> removeConnections = Sets.difference(currentConnections, neededConnections);
    +
    +        // Add new connections atomically
    +        cachedNodeToPortSocket.getAndUpdate(prev -> {
    +            Map<NodeInfo, IConnection> next = new HashMap<>(prev);
    +            for (NodeInfo nodeInfo : newConnections) {
    +                next.put(nodeInfo,
    +                    mqContext.connect(
    +                        topologyId,
    +                        assignment.get_node_host().get(nodeInfo.get_node()),    // Host
    +                        nodeInfo.get_port().iterator().next().intValue()));     // Port
    +            }
    +            return next;
    +        });
    +
    +
    +        try {
    +            endpointSocketLock.writeLock().lock();
    +            cachedTaskToNodePort.set(newTaskToNodePort);
    +        } finally {
    +            endpointSocketLock.writeLock().unlock();
    +        }
    +
    +        for (NodeInfo nodeInfo : removeConnections) {
    +            cachedNodeToPortSocket.get().get(nodeInfo).close();
    +        }
    +
    +        // Remove old connections atomically
    +        cachedNodeToPortSocket.getAndUpdate(prev -> {
    +            Map<NodeInfo, IConnection> next = new HashMap<>(prev);
    +            removeConnections.forEach(next::remove);
    +            return next;
    +        });
    +
    +    }
    +
    +    public void refreshStormActive() {
    +        refreshStormActive(() -> refreshActiveTimer.schedule(0, this::refreshStormActive));
    +    }
    +
    +    public void refreshStormActive(Runnable callback) {
    +        StormBase base = stormClusterState.stormBase(topologyId, callback);
    +        isTopologyActive.set(
    +            (null != base) &&
    +            (base.get_status() == TopologyStatus.ACTIVE) &&
    +            (isWorkerActive.get()));
    +        if (null != base) {
    +            Map<String, DebugOptions> debugOptionsMap = new HashMap<>(base.get_component_debug());
    +            for (DebugOptions debugOptions : debugOptionsMap.values()) {
    +                if (!debugOptions.is_set_samplingpct()) {
    +                    debugOptions.set_samplingpct(10);
    +                }
    +                if (!debugOptions.is_set_enable()) {
    +                    debugOptions.set_enable(false);
    +                }
    +            }
    +            stormComponentToDebug.set(debugOptionsMap);
    +            LOG.debug("Events debug options {}", stormComponentToDebug.get());
    +        }
    +    }
    +
    +    public void refreshThrottle() {
    +        boolean backpressure = stormClusterState.topologyBackpressure(topologyId, this::refreshThrottle);
    +        this.throttleOn.set(backpressure);
    +    }
    +
    +    public void refreshLoad() {
    +        Set<Integer> remoteTasks = Sets.difference(new HashSet<Integer>(outboundTasks), new HashSet<>(taskIds));
    +        Long now = System.currentTimeMillis();
    +        Map<Integer, Double> localLoad = shortExecutorReceiveQueueMap.entrySet().stream().collect(Collectors.toMap(
    +            (Function<Map.Entry<Integer, DisruptorQueue>, Integer>) Map.Entry::getKey,
    +            (Function<Map.Entry<Integer, DisruptorQueue>, Double>) entry -> {
    +                DisruptorQueue.QueueMetrics qMetrics = entry.getValue().getMetrics();
    +                return ( (double) qMetrics.population()) / qMetrics.capacity();
    +            }));
    +
    +        Map<Integer, Load> remoteLoad = new HashMap<>();
    +        cachedNodeToPortSocket.get().values().stream().forEach(conn -> remoteLoad.putAll(conn.getLoad(remoteTasks)));
    +        loadMapping.setLocal(localLoad);
    +        loadMapping.setRemote(remoteLoad);
    +
    +        if (now > nextUpdate.get()) {
    +            receiver.sendLoadMetrics(localLoad);
    +            nextUpdate.set(now + LOAD_REFRESH_INTERVAL_MS);
    +        }
    +    }
    +
    +    /**
    +     * we will wait all connections to be ready and then activate the spout/bolt
    +     * when the worker bootup
    +     */
    +    public void activateWorkerWhenAllConnectionsReady() {
    +        int delaySecs = 0;
    +        int recurSecs = 1;
    +        refreshActiveTimer.schedule(delaySecs, new Runnable() {
    +            @Override public void run() {
    +                if (areAllConnectionsReady()) {
    +                    LOG.info("All connections are ready for worker {}:{} with id", assignmentId, port, workerId);
    +                    isWorkerActive.set(Boolean.TRUE);
    +                } else {
    +                    refreshActiveTimer.schedule(recurSecs, () -> activateWorkerWhenAllConnectionsReady(), false, 0);
    +                }
    +            }
    +        });
    +    }
    +
    +    public void registerCallbacks() {
    +        LOG.info("Registering IConnectionCallbacks for {}:{}", assignmentId, port);
    +        receiver.registerRecv(new DeserializingConnectionCallback(topologyConf,
    +            getWorkerTopologyContext(),
    +            this::transferLocal));
    +    }
    +
    +    public void transferLocal(List<AddressedTuple> tupleBatch) {
    +        Map<Integer, List<AddressedTuple>> grouped = new HashMap<>();
    +        for (AddressedTuple tuple : tupleBatch) {
    +            Integer executor = taskToShortExecutor.get(tuple.dest);
    +            if (null == executor) {
    +                LOG.warn("Received invalid messages for unknown tasks. Dropping... ");
    +                continue;
    +            }
    +            List<AddressedTuple> current = grouped.get(executor);
    +            if (null == current) {
    +                current = new ArrayList<>();
    +                grouped.put(executor, current);
    +            }
    +            current.add(tuple);
    +        }
    +
    +        for (Map.Entry<Integer, List<AddressedTuple>> entry : grouped.entrySet()) {
    +            DisruptorQueue queue = shortExecutorReceiveQueueMap.get(entry.getKey());
    +            if (null != queue) {
    +                queue.publish(entry.getValue());
    +            } else {
    +                LOG.warn("Received invalid messages for unknown tasks. Dropping... ");
    +            }
    +        }
    +    }
    +
    +    public void transfer(KryoTupleSerializer serializer, List<AddressedTuple> tupleBatch) {
    +        if (trySerializeLocal) {
    +            assertCanSerialize(serializer, tupleBatch);
    +        }
    +        List<AddressedTuple> local = new ArrayList<>();
    +        Map<Integer, List<TaskMessage>> remoteMap = new HashMap<>();
    +        for (AddressedTuple addressedTuple : tupleBatch) {
    +            int destTask = addressedTuple.getDest();
    +            if (taskIds.contains(destTask)) {
    +                // Local task
    +                local.add(addressedTuple);
    +            } else {
    +                // Using java objects directly to avoid performance issues in java code
    +                if (! remoteMap.containsKey(destTask)) {
    +                    remoteMap.put(destTask, new ArrayList<>());
    +                }
    +                remoteMap.get(destTask).add(new TaskMessage(destTask, serializer.serialize(addressedTuple.getTuple())));
    +            }
    +        }
    +
    +        if (!local.isEmpty()) {
    +            transferLocal(local);
    +        }
    +        if (!remoteMap.isEmpty()) {
    +            transferQueue.publish(remoteMap);
    +        }
    +    }
    +
    +    // TODO: consider having a max batch size besides what disruptor does automagically to prevent latency issues
    +    public void sendTuplesToRemoteWorker(HashMap<Integer, ArrayList<TaskMessage>> packets, long seqId, boolean batchEnd) {
    +        drainer.add(packets);
    +        if (batchEnd) {
    +            ReentrantReadWriteLock.ReadLock readLock = endpointSocketLock.readLock();
    +            try {
    +                readLock.lock();
    +                drainer.send(cachedTaskToNodePort.get(), cachedNodeToPortSocket.get());
    +            } finally {
    +                readLock.unlock();
    +            }
    +            drainer.clear();
    +        }
    +    }
    +
    +
    +    private void assertCanSerialize(KryoTupleSerializer serializer, List<AddressedTuple> tuples) {
    +        // Check that all of the tuples can be serialized by serializing them
    +        for (AddressedTuple addressedTuple : tuples) {
    +            serializer.serialize(addressedTuple.getTuple());
    +        }
    +    }
    +
    +    public WorkerTopologyContext getWorkerTopologyContext() {
    +        try {
    +            String codeDir = ConfigUtils.supervisorStormResourcesPath(ConfigUtils.supervisorStormDistRoot(conf, topologyId));
    +            String pidDir = ConfigUtils.workerPidsRoot(conf, topologyId);
    +            return new WorkerTopologyContext(systemTopology, topologyConf, taskToComponent, componentToSortedTasks,
    +                componentToStreamToFields, topologyId, codeDir, pidDir, port, taskIds,
    +                defaultSharedResources,
    +                userSharedResources);
    +        } catch (IOException e) {
    +            throw Utils.wrapInRuntime(e);
    +        }
    +    }
    +
    +    public void runWorkerStartHooks() {
    +        WorkerTopologyContext workerContext = getWorkerTopologyContext();
    +        for (ByteBuffer hook : topology.get_worker_hooks()) {
    +            byte[] hookBytes = Utils.toByteArray(hook);
    +            BaseWorkerHook hookObject = Utils.javaDeserialize(hookBytes, BaseWorkerHook.class);
    +            hookObject.start(topologyConf, workerContext);
    +
    +        }
    +    }
    +
    +    public void runWorkerShutdownHooks() {
    +        for (ByteBuffer hook : topology.get_worker_hooks()) {
    +            byte[] hookBytes = Utils.toByteArray(hook);
    +            BaseWorkerHook hookObject = Utils.javaDeserialize(hookBytes, BaseWorkerHook.class);
    +            hookObject.shutdown();
    +
    +        }
    +    }
    +
    +    public void closeResources() {
    +        LOG.info("Shutting down default resources");
    +        ((ExecutorService) defaultSharedResources.get(WorkerTopologyContext.SHARED_EXECUTOR)).shutdownNow();
    +        LOG.info("Shut down default resources");
    +    }
    +
    +    public boolean areAllConnectionsReady() {
    +        return cachedNodeToPortSocket.get().values()
    +            .stream()
    +            .map(WorkerState::isConnectionReady)
    +            .reduce((left, right) -> left && right)
    +            .orElse(true);
    +    }
    +
    +    public static boolean isConnectionReady(IConnection connection) {
    +        return !(connection instanceof ConnectionWithStatus)
    +            || ((ConnectionWithStatus) connection).status() == ConnectionWithStatus.Status.Ready;
    +    }
    +
    +    private List<List<Long>> readWorkerExecutors(IStormClusterState stormClusterState, String topologyId, String assignmentId,
    +        int port) {
    +        LOG.info("Reading assignments");
    +        List<List<Long>> executorsAssignedToThisWorker = new ArrayList<>();
    +        executorsAssignedToThisWorker.add(Constants.SYSTEM_EXECUTOR_ID);
    +        Map<List<Long>, NodeInfo> executorToNodePort =
    +            stormClusterState.assignmentInfo(topologyId, null).get_executor_node_port();
    +        for (Map.Entry<List<Long>, NodeInfo> entry : executorToNodePort.entrySet()) {
    +            NodeInfo nodeInfo = entry.getValue();
    +            if (nodeInfo.get_node().equals(assignmentId) && nodeInfo.get_port().iterator().next() == port) {
    +                executorsAssignedToThisWorker.add(entry.getKey());
    +            }
    +        }
    +        return executorsAssignedToThisWorker;
    +    }
    +
    +    private Map<List<Long>, DisruptorQueue> mkReceiveQueueMap(Map topologyConf, Set<List<Long>> executors) {
    +        Map<List<Long>, DisruptorQueue> receiveQueueMap = new HashMap<>();
    +        for (List<Long> executor : executors) {
    +            receiveQueueMap.put(executor, new DisruptorQueue("receive-queue",
    +                Utils.getInt(topologyConf.get(Config.TOPOLOGY_EXECUTOR_RECEIVE_BUFFER_SIZE)),
    +                (long) topologyConf.get(Config.TOPOLOGY_DISRUPTOR_WAIT_TIMEOUT_MILLIS),
    +                Utils.getInt(topologyConf.get(Config.TOPOLOGY_DISRUPTOR_BATCH_SIZE)),
    +                (long) topologyConf.get(Config.TOPOLOGY_DISRUPTOR_BATCH_TIMEOUT_MILLIS)));
    +        }
    +        return receiveQueueMap;
    +    }
    +    
    +    private Map<String, Object> makeDefaultResources() {
    +        int threadPoolSize = Utils.getInt(conf.get(Config.TOPOLOGY_WORKER_SHARED_THREAD_POOL_SIZE));
    +        return ImmutableMap.of(WorkerTopologyContext.SHARED_EXECUTOR, Executors.newFixedThreadPool(threadPoolSize));
    +    }
    +    
    +    private Map<String, Object> makeUserResources() {
    +        /* TODO: need to invoke a hook provided by the topology, giving it a chance to create user resources.
    +        * this would be part of the initialization hook
    +        * need to separate workertopologycontext into WorkerContext and WorkerUserContext.
    +        * actually just do it via interfaces. just need to make sure to hide setResource from tasks
    +        */
    +        return new HashMap<>();
    +    }
    +
    +    private StormTimer mkHaltingTimer(String name) {
    +        return new StormTimer(name, (thread, exception) -> {
    +            LOG.error("Error when processing event", exception);
    +            Utils.exitProcess(20, "Error when processing an event");
    +        });
    +    }
    +
    +    /**
    +     *
    +     * @return seq of task ids that receive messages from this worker
    +     */
    +    private List<Integer> workerOutboundTasks() {
    +        WorkerTopologyContext context = getWorkerTopologyContext();
    +        List<String> components = new ArrayList<>();
    +        for (Integer taskId : taskIds) {
    +            for (Map<String, Grouping> value : context.getTargets(context.getComponentId(taskId)).values()) {
    +                components.addAll(value.keySet());
    +            }
    +        }
    +
    +        List<Integer> outboundTasks = new ArrayList<>();
    --- End diff --
    
    Correct me if I'm wrong. I guess this should be a Set instead of List.
    
    ```
    -    (-> worker		
     -        :task->component		
     -        (Utils/reverseMap)		
     -        clojurify-structure		
     -        (select-keys components)		
     -        vals		
     -        flatten		
     -        set )))
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1756: STORM-1278: Port org.apache.storm.daemon.worker to...

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1756#discussion_r87191222
  
    --- Diff: storm-core/src/jvm/org/apache/storm/daemon/worker/Worker.java ---
    @@ -0,0 +1,426 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + * <p>
    + * http://www.apache.org/licenses/LICENSE-2.0
    + * <p>
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.storm.daemon.worker;
    +
    +import com.google.common.base.Preconditions;
    +import com.lmax.disruptor.EventHandler;
    +import org.apache.commons.io.FileUtils;
    +import org.apache.commons.lang.ObjectUtils;
    +import org.apache.storm.Config;
    +import org.apache.storm.cluster.*;
    +import org.apache.storm.daemon.DaemonCommon;
    +import org.apache.storm.daemon.Shutdownable;
    +import org.apache.storm.daemon.StormCommon;
    +import org.apache.storm.executor.Executor;
    +import org.apache.storm.executor.ExecutorShutdown;
    +import org.apache.storm.executor.IRunningExecutor;
    +import org.apache.storm.executor.LocalExecutor;
    +import org.apache.storm.generated.*;
    +import org.apache.storm.messaging.IConnection;
    +import org.apache.storm.messaging.IContext;
    +import org.apache.storm.messaging.TaskMessage;
    +import org.apache.storm.security.auth.AuthUtils;
    +import org.apache.storm.security.auth.IAutoCredentials;
    +import org.apache.storm.stats.StatsUtil;
    +import org.apache.storm.utils.*;
    +import org.apache.zookeeper.data.ACL;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +import uk.org.lidalia.sysoutslf4j.context.SysOutOverSLF4J;
    +
    +import javax.security.auth.Subject;
    +import java.io.File;
    +import java.io.IOException;
    +import java.nio.charset.Charset;
    +import java.security.PrivilegedExceptionAction;
    +import java.util.*;
    +import java.util.concurrent.atomic.AtomicReference;
    +import java.util.function.Function;
    +import java.util.stream.Collectors;
    +
    +public class Worker implements Shutdownable, DaemonCommon {
    +
    +    private static final Logger LOG = LoggerFactory.getLogger(Worker.class);
    +    private final Map conf;
    +    private final IContext context;
    +    private final String topologyId;
    +    private final String assignmentId;
    +    private final int port;
    +    private final String workerId;
    +    private final LogConfigManager logConfigManager;
    +
    +
    +    private WorkerState workerState;
    +    private AtomicReference<List<IRunningExecutor>> executorsAtom;
    +    private Thread transferThread;
    +    private WorkerBackpressureThread backpressureThread;
    +
    +    private AtomicReference<Credentials> credentialsAtom;
    +    private Subject subject;
    +    private Collection<IAutoCredentials> autoCreds;
    +
    +
    +    /**
    +     * TODO: should worker even take the topologyId as input? this should be
    +     * deducable from cluster state (by searching through assignments)
    +     * what about if there's inconsistency in assignments? -> but nimbus should guarantee this consistency
    +     *
    +     * @param conf         - Storm configuration
    +     * @param context      -
    +     * @param topologyId   - topology id
    +     * @param assignmentId - assignement id
    +     * @param port         - port on which the worker runs
    +     * @param workerId     - worker id
    +     */
    +
    +    public Worker(Map conf, IContext context, String topologyId, String assignmentId, int port, String workerId) {
    +        this.conf = conf;
    +        this.context = context;
    +        this.topologyId = topologyId;
    +        this.assignmentId = assignmentId;
    +        this.port = port;
    +        this.workerId = workerId;
    +        this.logConfigManager = new LogConfigManager();
    +    }
    +
    +    public void start() throws Exception {
    +        LOG.info("Launching worker for {} on {}:{} with id {} and conf {}", topologyId, assignmentId, port, workerId,
    +            conf);
    +        // because in local mode, its not a separate
    +        // process. supervisor will register it in this case
    +        // if ConfigUtils.isLocalMode(conf) returns false then it is in distributed mode.
    +        if (!ConfigUtils.isLocalMode(conf)) {
    +            // Distributed mode
    +            SysOutOverSLF4J.sendSystemOutAndErrToSLF4J();
    +            String pid = Utils.processPid();
    +            FileUtils.touch(new File(ConfigUtils.workerPidPath(conf, workerId, pid)));
    +            FileUtils.writeStringToFile(new File(ConfigUtils.workerArtifactsPidPath(conf, topologyId, port)), pid,
    +                Charset.forName("UTF-8"));
    +        }
    +        final Map topologyConf =
    +            ConfigUtils.overrideLoginConfigWithSystemProperty(ConfigUtils.readSupervisorStormConf(conf, topologyId));
    +        List<ACL> acls = Utils.getWorkerACL(topologyConf);
    +        IStateStorage stateStorage =
    +            ClusterUtils.mkStateStorage(conf, topologyConf, acls, new ClusterStateContext(DaemonType.WORKER));
    +        IStormClusterState stormClusterState =
    +            ClusterUtils.mkStormClusterState(stateStorage, acls, new ClusterStateContext());
    +        Credentials initialCredentials = stormClusterState.credentials(topologyId, null);
    +        autoCreds = AuthUtils.GetAutoCredentials(topologyConf);
    +        subject = AuthUtils.populateSubject(null, autoCreds, initialCredentials.get_creds());
    +
    +        Subject.doAs(subject, new PrivilegedExceptionAction<Object>() {
    +            @Override public Object run() throws Exception {
    +                workerState =
    +                    new WorkerState(conf, context, topologyId, assignmentId, port, workerId, topologyConf, stateStorage,
    +                        stormClusterState);
    +
    +                // Heartbeat here so that worker process dies if this fails
    +                // it's important that worker heartbeat to supervisor ASAP so that supervisor knows
    +                // that worker is running and moves on
    +                doHeartBeat();
    +
    +                executorsAtom = new AtomicReference<>(null);
    +
    +                // launch heartbeat threads immediately so that slow-loading tasks don't cause the worker to timeout
    +                // to the supervisor
    +                workerState.heartbeatTimer
    +                    .scheduleRecurring(0, (Integer) conf.get(Config.WORKER_HEARTBEAT_FREQUENCY_SECS), () -> {
    +                        try {
    +                            doHeartBeat();
    +                        } catch (IOException e) {
    +                            throw new RuntimeException(e);
    +                        }
    +                    });
    +
    +                workerState.executorHeartbeatTimer
    +                    .scheduleRecurring(0, (Integer) conf.get(Config.WORKER_HEARTBEAT_FREQUENCY_SECS),
    +                        Worker.this::doExecutorHeartbeats);
    +
    +                workerState.registerCallbacks();
    +
    +                workerState.refreshConnections(null);
    +
    +                workerState.activateWorkerWhenAllConnectionsReady();
    +
    +                workerState.refreshStormActive(null);
    +
    +                workerState.runWorkerStartHooks();
    +
    +                List<IRunningExecutor> newExecutors = new ArrayList<IRunningExecutor>();
    +                for (List<Long> e : workerState.getExecutors()) {
    +                    if (ConfigUtils.isLocalMode(topologyConf)) {
    +                        newExecutors.add(
    +                            LocalExecutor.mkExecutor(workerState, e, initialCredentials.get_creds())
    +                                .execute());
    +                    } else {
    +                        newExecutors.add(
    +                            Executor.mkExecutor(workerState, e, initialCredentials.get_creds())
    +                                .execute());
    +                    }
    +                }
    +                executorsAtom.set(newExecutors);
    +
    +                EventHandler<Object> tupleHandler = (packets, seqId, batchEnd) -> workerState
    +                    .sendTuplesToRemoteWorker((HashMap<Integer, ArrayList<TaskMessage>>) packets, seqId, batchEnd);
    +
    +                // This thread will publish the messages destined for remote tasks to remote connections
    +                transferThread = Utils.asyncLoop(() -> {
    +                    workerState.transferQueue.consumeBatchWhenAvailable(tupleHandler);
    +                    return 0L;
    +                });
    +
    +                DisruptorBackpressureCallback disruptorBackpressureHandler =
    +                    mkDisruptorBackpressureHandler(workerState);
    +                workerState.transferQueue.registerBackpressureCallback(disruptorBackpressureHandler);
    +                workerState.transferQueue
    +                    .setEnableBackpressure((Boolean) topologyConf.get(Config.TOPOLOGY_BACKPRESSURE_ENABLE));
    +                workerState.transferQueue
    +                    .setHighWaterMark(Utils.getDouble(topologyConf.get(Config.BACKPRESSURE_DISRUPTOR_HIGH_WATERMARK)));
    +                workerState.transferQueue
    +                    .setLowWaterMark(Utils.getDouble(topologyConf.get(Config.BACKPRESSURE_DISRUPTOR_LOW_WATERMARK)));
    +
    +                WorkerBackpressureCallback backpressureCallback = mkBackpressureHandler();
    +                backpressureThread = new WorkerBackpressureThread(workerState.backpressureTrigger, workerState, backpressureCallback);
    +                if ((Boolean) topologyConf.get(Config.TOPOLOGY_BACKPRESSURE_ENABLE)) {
    +                    backpressureThread.start();
    +                    stormClusterState.topologyBackpressure(topologyId, workerState::refreshThrottle);
    +                }
    +
    +                credentialsAtom = new AtomicReference<Credentials>(initialCredentials);
    +
    +                establishLogSettingCallback();
    +
    +                workerState.stormClusterState.credentials(topologyId, Worker.this::checkCredentialsChanged);
    +
    +                workerState.refreshCredentialsTimer.scheduleRecurring(0,
    +                    (Integer) conf.get(Config.TASK_CREDENTIALS_POLL_SECS), new Runnable() {
    +                        @Override public void run() {
    +                            checkCredentialsChanged();
    +                            if ((Boolean) topologyConf.get(Config.TOPOLOGY_BACKPRESSURE_ENABLE)) {
    +                               checkThrottleChanged();
    +                            }
    +                        }
    +                    });
    +
    +                // The jitter allows the clients to get the data at different times, and avoids thundering herd
    +                if (!(Boolean) topologyConf.get(Config.TOPOLOGY_DISABLE_LOADAWARE_MESSAGING)) {
    +                    workerState.refreshLoadTimer.scheduleRecurringWithJitter(0, 1, 500, workerState::refreshLoad);
    +                }
    +
    +                workerState.refreshConnectionsTimer.scheduleRecurring(0,
    +                    (Integer) conf.get(Config.TASK_REFRESH_POLL_SECS), workerState::refreshConnections);
    +
    +                workerState.resetLogTevelsTimer.scheduleRecurring(0,
    +                    (Integer) conf.get(Config.WORKER_LOG_LEVEL_RESET_POLL_SECS), logConfigManager::resetLogLevels);
    +
    +                workerState.refreshActiveTimer.scheduleRecurring(0, (Integer) conf.get(Config.TASK_REFRESH_POLL_SECS),
    +                    workerState::refreshStormActive);
    +
    +                LOG.info("Worker has topology config {}", Utils.redactValue(topologyConf, Config.STORM_ZOOKEEPER_TOPOLOGY_AUTH_PAYLOAD));
    +                LOG.info("Worker {} for storm {} on {}:{}  has finished loading", workerId, topologyId, assignmentId, port);
    +                return this;
    +            };
    +        });
    +
    +    }
    +
    +    public void doHeartBeat() throws IOException {
    +        LocalState state = ConfigUtils.workerState(workerState.conf, workerState.workerId);
    +        state.setWorkerHeartBeat(new LSWorkerHeartbeat(Time.currentTimeSecs(), workerState.topologyId,
    +            workerState.executors.stream()
    +                .map(executor -> new ExecutorInfo(executor.get(0).intValue(), executor.get(1).intValue()))
    +                .collect(Collectors.toList()), workerState.port));
    +        state.cleanup(60); // this is just in case supervisor is down so that disk doesn't fill up.
    +        // it shouldn't take supervisor 120 seconds between listing dir and reading it
    +    }
    +
    +    public void doExecutorHeartbeats() {
    +        Map<List<Integer>, ExecutorStats> stats;
    +        List<IRunningExecutor> executors = this.executorsAtom.get();
    +        if (null == executors) {
    +            stats = StatsUtil.mkEmptyExecutorZkHbs(workerState.executors);
    +        } else {
    +            stats = StatsUtil.convertExecutorZkHbs(executors.stream().collect(Collectors
    +                .toMap((Function<IRunningExecutor, List<Long>>) IRunningExecutor::getExecutorId,
    +                    (Function<IRunningExecutor, ExecutorStats>) IRunningExecutor::renderStats)));
    +        }
    +        Map<String, Object> zkHB = StatsUtil.mkZkWorkerHb(workerState.topologyId, stats, workerState.uptime.upTime());
    +        try {
    +            workerState.stormClusterState
    +                .workerHeartbeat(workerState.topologyId, workerState.assignmentId, (long) workerState.port,
    +                    StatsUtil.thriftifyZkWorkerHb(zkHB));
    +        } catch (Exception ex) {
    +            LOG.error("Worker failed to write heartbeats to ZK or Pacemaker...will retry", ex);
    +        }
    +    }
    +
    +    public void checkCredentialsChanged() {
    +        Credentials newCreds = workerState.stormClusterState.credentials(topologyId, null);
    +        if (! ObjectUtils.equals(newCreds, credentialsAtom.get())) {
    +            // This does not have to be atomic, worst case we update when one is not needed
    +            AuthUtils.updateSubject(subject, autoCreds, (null == newCreds) ? null : newCreds.get_creds());
    +            for (IRunningExecutor executor : executorsAtom.get()) {
    +                executor.credenetialsChanged(newCreds);
    +            }
    +            credentialsAtom.set(newCreds);
    +        }
    +    }
    +
    +    public void checkThrottleChanged() {
    +        boolean throttleOn = workerState.stormClusterState.topologyBackpressure(topologyId, this::checkThrottleChanged);
    +        workerState.throttleOn.set(throttleOn);
    +    }
    +
    +    public void checkLogConfigChanged() {
    +        LogConfig logConfig = workerState.stormClusterState.topologyLogConfig(topologyId, null);
    +        logConfigManager.processLogConfigChange(logConfig);
    +        establishLogSettingCallback();
    +    }
    +
    +    public void establishLogSettingCallback() {
    +        workerState.stormClusterState.topologyLogConfig(topologyId, this::checkLogConfigChanged);
    +    }
    +
    +
    +    /**
    +     * make a handler for the worker's send disruptor queue to
    +     * check highWaterMark and lowWaterMark for backpressure
    +     */
    +    private DisruptorBackpressureCallback mkDisruptorBackpressureHandler(WorkerState workerState) {
    +        return new DisruptorBackpressureCallback() {
    +            @Override public void highWaterMark() throws Exception {
    +                workerState.transferBackpressure.set(true);
    --- End diff --
    
    Before translation it logs to debug:
    
    ```
    -      (log-debug "worker " (:worker-id worker) " transfer-queue is congested, set backpressure flag true")
    ```
    
    and just calls notifyBackpressureChecker(), not setting transferBackpressure. Is it newly added?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #1756: STORM-1278: Port org.apache.storm.daemon.worker to java

Posted by abhishekagarwal87 <gi...@git.apache.org>.
Github user abhishekagarwal87 commented on the issue:

    https://github.com/apache/storm/pull/1756
  
    That's good to hear Robert. I will resolve the conflicts and incorporate your suggestions. Thank you for your help. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1756: STORM-1278: Port org.apache.storm.daemon.worker to...

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1756#discussion_r87190721
  
    --- Diff: storm-core/src/jvm/org/apache/storm/daemon/worker/Worker.java ---
    @@ -0,0 +1,426 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + * <p>
    + * http://www.apache.org/licenses/LICENSE-2.0
    + * <p>
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.storm.daemon.worker;
    +
    +import com.google.common.base.Preconditions;
    +import com.lmax.disruptor.EventHandler;
    +import org.apache.commons.io.FileUtils;
    +import org.apache.commons.lang.ObjectUtils;
    +import org.apache.storm.Config;
    +import org.apache.storm.cluster.*;
    +import org.apache.storm.daemon.DaemonCommon;
    +import org.apache.storm.daemon.Shutdownable;
    +import org.apache.storm.daemon.StormCommon;
    +import org.apache.storm.executor.Executor;
    +import org.apache.storm.executor.ExecutorShutdown;
    +import org.apache.storm.executor.IRunningExecutor;
    +import org.apache.storm.executor.LocalExecutor;
    +import org.apache.storm.generated.*;
    +import org.apache.storm.messaging.IConnection;
    +import org.apache.storm.messaging.IContext;
    +import org.apache.storm.messaging.TaskMessage;
    +import org.apache.storm.security.auth.AuthUtils;
    +import org.apache.storm.security.auth.IAutoCredentials;
    +import org.apache.storm.stats.StatsUtil;
    +import org.apache.storm.utils.*;
    +import org.apache.zookeeper.data.ACL;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +import uk.org.lidalia.sysoutslf4j.context.SysOutOverSLF4J;
    +
    +import javax.security.auth.Subject;
    +import java.io.File;
    +import java.io.IOException;
    +import java.nio.charset.Charset;
    +import java.security.PrivilegedExceptionAction;
    +import java.util.*;
    +import java.util.concurrent.atomic.AtomicReference;
    +import java.util.function.Function;
    +import java.util.stream.Collectors;
    +
    +public class Worker implements Shutdownable, DaemonCommon {
    +
    +    private static final Logger LOG = LoggerFactory.getLogger(Worker.class);
    +    private final Map conf;
    +    private final IContext context;
    +    private final String topologyId;
    +    private final String assignmentId;
    +    private final int port;
    +    private final String workerId;
    +    private final LogConfigManager logConfigManager;
    +
    +
    +    private WorkerState workerState;
    +    private AtomicReference<List<IRunningExecutor>> executorsAtom;
    +    private Thread transferThread;
    +    private WorkerBackpressureThread backpressureThread;
    +
    +    private AtomicReference<Credentials> credentialsAtom;
    +    private Subject subject;
    +    private Collection<IAutoCredentials> autoCreds;
    +
    +
    +    /**
    +     * TODO: should worker even take the topologyId as input? this should be
    +     * deducable from cluster state (by searching through assignments)
    +     * what about if there's inconsistency in assignments? -> but nimbus should guarantee this consistency
    +     *
    +     * @param conf         - Storm configuration
    +     * @param context      -
    +     * @param topologyId   - topology id
    +     * @param assignmentId - assignement id
    +     * @param port         - port on which the worker runs
    +     * @param workerId     - worker id
    +     */
    +
    +    public Worker(Map conf, IContext context, String topologyId, String assignmentId, int port, String workerId) {
    +        this.conf = conf;
    +        this.context = context;
    +        this.topologyId = topologyId;
    +        this.assignmentId = assignmentId;
    +        this.port = port;
    +        this.workerId = workerId;
    +        this.logConfigManager = new LogConfigManager();
    +    }
    +
    +    public void start() throws Exception {
    +        LOG.info("Launching worker for {} on {}:{} with id {} and conf {}", topologyId, assignmentId, port, workerId,
    +            conf);
    +        // because in local mode, its not a separate
    +        // process. supervisor will register it in this case
    +        // if ConfigUtils.isLocalMode(conf) returns false then it is in distributed mode.
    +        if (!ConfigUtils.isLocalMode(conf)) {
    +            // Distributed mode
    +            SysOutOverSLF4J.sendSystemOutAndErrToSLF4J();
    +            String pid = Utils.processPid();
    +            FileUtils.touch(new File(ConfigUtils.workerPidPath(conf, workerId, pid)));
    +            FileUtils.writeStringToFile(new File(ConfigUtils.workerArtifactsPidPath(conf, topologyId, port)), pid,
    +                Charset.forName("UTF-8"));
    +        }
    +        final Map topologyConf =
    +            ConfigUtils.overrideLoginConfigWithSystemProperty(ConfigUtils.readSupervisorStormConf(conf, topologyId));
    +        List<ACL> acls = Utils.getWorkerACL(topologyConf);
    +        IStateStorage stateStorage =
    +            ClusterUtils.mkStateStorage(conf, topologyConf, acls, new ClusterStateContext(DaemonType.WORKER));
    +        IStormClusterState stormClusterState =
    +            ClusterUtils.mkStormClusterState(stateStorage, acls, new ClusterStateContext());
    +        Credentials initialCredentials = stormClusterState.credentials(topologyId, null);
    +        autoCreds = AuthUtils.GetAutoCredentials(topologyConf);
    +        subject = AuthUtils.populateSubject(null, autoCreds, initialCredentials.get_creds());
    +
    +        Subject.doAs(subject, new PrivilegedExceptionAction<Object>() {
    +            @Override public Object run() throws Exception {
    +                workerState =
    +                    new WorkerState(conf, context, topologyId, assignmentId, port, workerId, topologyConf, stateStorage,
    +                        stormClusterState);
    +
    +                // Heartbeat here so that worker process dies if this fails
    +                // it's important that worker heartbeat to supervisor ASAP so that supervisor knows
    +                // that worker is running and moves on
    +                doHeartBeat();
    +
    +                executorsAtom = new AtomicReference<>(null);
    +
    +                // launch heartbeat threads immediately so that slow-loading tasks don't cause the worker to timeout
    +                // to the supervisor
    +                workerState.heartbeatTimer
    +                    .scheduleRecurring(0, (Integer) conf.get(Config.WORKER_HEARTBEAT_FREQUENCY_SECS), () -> {
    +                        try {
    +                            doHeartBeat();
    +                        } catch (IOException e) {
    +                            throw new RuntimeException(e);
    +                        }
    +                    });
    +
    +                workerState.executorHeartbeatTimer
    +                    .scheduleRecurring(0, (Integer) conf.get(Config.WORKER_HEARTBEAT_FREQUENCY_SECS),
    +                        Worker.this::doExecutorHeartbeats);
    +
    +                workerState.registerCallbacks();
    +
    +                workerState.refreshConnections(null);
    +
    +                workerState.activateWorkerWhenAllConnectionsReady();
    +
    +                workerState.refreshStormActive(null);
    +
    +                workerState.runWorkerStartHooks();
    +
    +                List<IRunningExecutor> newExecutors = new ArrayList<IRunningExecutor>();
    +                for (List<Long> e : workerState.getExecutors()) {
    +                    if (ConfigUtils.isLocalMode(topologyConf)) {
    +                        newExecutors.add(
    +                            LocalExecutor.mkExecutor(workerState, e, initialCredentials.get_creds())
    +                                .execute());
    +                    } else {
    +                        newExecutors.add(
    +                            Executor.mkExecutor(workerState, e, initialCredentials.get_creds())
    +                                .execute());
    +                    }
    +                }
    +                executorsAtom.set(newExecutors);
    +
    +                EventHandler<Object> tupleHandler = (packets, seqId, batchEnd) -> workerState
    +                    .sendTuplesToRemoteWorker((HashMap<Integer, ArrayList<TaskMessage>>) packets, seqId, batchEnd);
    +
    +                // This thread will publish the messages destined for remote tasks to remote connections
    +                transferThread = Utils.asyncLoop(() -> {
    +                    workerState.transferQueue.consumeBatchWhenAvailable(tupleHandler);
    +                    return 0L;
    +                });
    +
    +                DisruptorBackpressureCallback disruptorBackpressureHandler =
    +                    mkDisruptorBackpressureHandler(workerState);
    +                workerState.transferQueue.registerBackpressureCallback(disruptorBackpressureHandler);
    +                workerState.transferQueue
    +                    .setEnableBackpressure((Boolean) topologyConf.get(Config.TOPOLOGY_BACKPRESSURE_ENABLE));
    +                workerState.transferQueue
    +                    .setHighWaterMark(Utils.getDouble(topologyConf.get(Config.BACKPRESSURE_DISRUPTOR_HIGH_WATERMARK)));
    +                workerState.transferQueue
    +                    .setLowWaterMark(Utils.getDouble(topologyConf.get(Config.BACKPRESSURE_DISRUPTOR_LOW_WATERMARK)));
    +
    +                WorkerBackpressureCallback backpressureCallback = mkBackpressureHandler();
    +                backpressureThread = new WorkerBackpressureThread(workerState.backpressureTrigger, workerState, backpressureCallback);
    +                if ((Boolean) topologyConf.get(Config.TOPOLOGY_BACKPRESSURE_ENABLE)) {
    +                    backpressureThread.start();
    +                    stormClusterState.topologyBackpressure(topologyId, workerState::refreshThrottle);
    +                }
    +
    +                credentialsAtom = new AtomicReference<Credentials>(initialCredentials);
    +
    +                establishLogSettingCallback();
    +
    +                workerState.stormClusterState.credentials(topologyId, Worker.this::checkCredentialsChanged);
    +
    +                workerState.refreshCredentialsTimer.scheduleRecurring(0,
    +                    (Integer) conf.get(Config.TASK_CREDENTIALS_POLL_SECS), new Runnable() {
    +                        @Override public void run() {
    +                            checkCredentialsChanged();
    +                            if ((Boolean) topologyConf.get(Config.TOPOLOGY_BACKPRESSURE_ENABLE)) {
    +                               checkThrottleChanged();
    +                            }
    +                        }
    +                    });
    +
    +                // The jitter allows the clients to get the data at different times, and avoids thundering herd
    +                if (!(Boolean) topologyConf.get(Config.TOPOLOGY_DISABLE_LOADAWARE_MESSAGING)) {
    +                    workerState.refreshLoadTimer.scheduleRecurringWithJitter(0, 1, 500, workerState::refreshLoad);
    +                }
    +
    +                workerState.refreshConnectionsTimer.scheduleRecurring(0,
    +                    (Integer) conf.get(Config.TASK_REFRESH_POLL_SECS), workerState::refreshConnections);
    +
    +                workerState.resetLogTevelsTimer.scheduleRecurring(0,
    +                    (Integer) conf.get(Config.WORKER_LOG_LEVEL_RESET_POLL_SECS), logConfigManager::resetLogLevels);
    +
    +                workerState.refreshActiveTimer.scheduleRecurring(0, (Integer) conf.get(Config.TASK_REFRESH_POLL_SECS),
    +                    workerState::refreshStormActive);
    +
    +                LOG.info("Worker has topology config {}", Utils.redactValue(topologyConf, Config.STORM_ZOOKEEPER_TOPOLOGY_AUTH_PAYLOAD));
    +                LOG.info("Worker {} for storm {} on {}:{}  has finished loading", workerId, topologyId, assignmentId, port);
    +                return this;
    +            };
    +        });
    +
    +    }
    +
    +    public void doHeartBeat() throws IOException {
    +        LocalState state = ConfigUtils.workerState(workerState.conf, workerState.workerId);
    +        state.setWorkerHeartBeat(new LSWorkerHeartbeat(Time.currentTimeSecs(), workerState.topologyId,
    +            workerState.executors.stream()
    +                .map(executor -> new ExecutorInfo(executor.get(0).intValue(), executor.get(1).intValue()))
    +                .collect(Collectors.toList()), workerState.port));
    +        state.cleanup(60); // this is just in case supervisor is down so that disk doesn't fill up.
    +        // it shouldn't take supervisor 120 seconds between listing dir and reading it
    +    }
    +
    +    public void doExecutorHeartbeats() {
    +        Map<List<Integer>, ExecutorStats> stats;
    +        List<IRunningExecutor> executors = this.executorsAtom.get();
    +        if (null == executors) {
    +            stats = StatsUtil.mkEmptyExecutorZkHbs(workerState.executors);
    +        } else {
    +            stats = StatsUtil.convertExecutorZkHbs(executors.stream().collect(Collectors
    +                .toMap((Function<IRunningExecutor, List<Long>>) IRunningExecutor::getExecutorId,
    +                    (Function<IRunningExecutor, ExecutorStats>) IRunningExecutor::renderStats)));
    +        }
    +        Map<String, Object> zkHB = StatsUtil.mkZkWorkerHb(workerState.topologyId, stats, workerState.uptime.upTime());
    +        try {
    +            workerState.stormClusterState
    +                .workerHeartbeat(workerState.topologyId, workerState.assignmentId, (long) workerState.port,
    +                    StatsUtil.thriftifyZkWorkerHb(zkHB));
    +        } catch (Exception ex) {
    +            LOG.error("Worker failed to write heartbeats to ZK or Pacemaker...will retry", ex);
    +        }
    +    }
    +
    +    public void checkCredentialsChanged() {
    +        Credentials newCreds = workerState.stormClusterState.credentials(topologyId, null);
    +        if (! ObjectUtils.equals(newCreds, credentialsAtom.get())) {
    +            // This does not have to be atomic, worst case we update when one is not needed
    +            AuthUtils.updateSubject(subject, autoCreds, (null == newCreds) ? null : newCreds.get_creds());
    +            for (IRunningExecutor executor : executorsAtom.get()) {
    +                executor.credenetialsChanged(newCreds);
    +            }
    +            credentialsAtom.set(newCreds);
    +        }
    +    }
    +
    +    public void checkThrottleChanged() {
    +        boolean throttleOn = workerState.stormClusterState.topologyBackpressure(topologyId, this::checkThrottleChanged);
    +        workerState.throttleOn.set(throttleOn);
    +    }
    +
    +    public void checkLogConfigChanged() {
    +        LogConfig logConfig = workerState.stormClusterState.topologyLogConfig(topologyId, null);
    +        logConfigManager.processLogConfigChange(logConfig);
    +        establishLogSettingCallback();
    +    }
    +
    +    public void establishLogSettingCallback() {
    +        workerState.stormClusterState.topologyLogConfig(topologyId, this::checkLogConfigChanged);
    +    }
    +
    +
    +    /**
    +     * make a handler for the worker's send disruptor queue to
    +     * check highWaterMark and lowWaterMark for backpressure
    +     */
    +    private DisruptorBackpressureCallback mkDisruptorBackpressureHandler(WorkerState workerState) {
    +        return new DisruptorBackpressureCallback() {
    +            @Override public void highWaterMark() throws Exception {
    +                workerState.transferBackpressure.set(true);
    +                WorkerBackpressureThread.notifyBackpressureChecker(workerState.backpressureTrigger);
    +            }
    +
    +            @Override public void lowWaterMark() throws Exception {
    +                workerState.transferBackpressure.set(false);
    +                WorkerBackpressureThread.notifyBackpressureChecker(workerState.backpressureTrigger);
    +            }
    +        };
    +    }
    +
    +    /**
    +     * make a handler that checks and updates worker's backpressure flag
    +     */
    +    private WorkerBackpressureCallback mkBackpressureHandler() {
    +        final List<IRunningExecutor> executors = executorsAtom.get();
    +        return new WorkerBackpressureCallback() {
    +            @Override public void onEvent(Object obj) {
    +                String topologyId = workerState.topologyId;
    +                String assignmentId = workerState.assignmentId;
    +                int port = workerState.port;
    +                IStormClusterState stormClusterState = workerState.stormClusterState;
    +                boolean prevBackpressureFlag = workerState.backpressure.get();
    +                boolean currBackpressureFlag = prevBackpressureFlag;
    +                if (null != executors) {
    +                    currBackpressureFlag = workerState.transferQueue.getThrottleOn() || (executors.stream()
    +                        .map(IRunningExecutor::getBackPressureFlag).reduce((op1, op2) -> (op1 || op2)).get());
    +                }
    +
    +                if (currBackpressureFlag != prevBackpressureFlag) {
    +                    try {
    +                        LOG.debug("worker backpressure flag changing from {} to {}", prevBackpressureFlag, currBackpressureFlag);
    +                        stormClusterState.workerBackpressure(topologyId, assignmentId, (long) port, currBackpressureFlag);
    --- End diff --
    
    Below lines seems not translated here:
    
    ```
                ;; doing the local reset after the zk update succeeds is very important to avoid a bad state upon zk exception		
    -            (.set (:backpressure worker) curr-backpressure-flag)
    ```
    
    Is it moved, or just missed?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #1756: STORM-1278: Port org.apache.storm.daemon.worker to java

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on the issue:

    https://github.com/apache/storm/pull/1756
  
    OK I'll get started on this.  Should hopefully have something up soon.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #1756: STORM-1278: Port org.apache.storm.daemon.worker to java

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on the issue:

    https://github.com/apache/storm/pull/1756
  
    @abhishekagarwal87 happy to take a look.  I'll see what I can do on the tests.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1756: STORM-1278: Port org.apache.storm.daemon.worker to...

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1756#discussion_r86233780
  
    --- Diff: storm-core/src/jvm/org/apache/storm/daemon/Task.java ---
    @@ -179,22 +178,21 @@ public BuiltinMetrics getBuiltInMetrics() {
         }
     
         private TopologyContext mkTopologyContext(StormTopology topology) throws IOException {
    -        Map conf = (Map) workerData.get(Constants.CONF);
    +        Map conf = workerData.getConf();
             return new TopologyContext(
    -                topology,
    -                (Map) workerData.get(Constants.STORM_CONF),
    -                (Map<Integer, String>) workerData.get(Constants.TASK_TO_COMPONENT),
    -                (Map<String, List<Integer>>) workerData.get(Constants.COMPONENT_TO_SORTED_TASKS),
    -                (Map<String, Map<String, Fields>>) workerData.get(Constants.COMPONENT_TO_STREAM_TO_FIELDS),
    -                (String) workerData.get(Constants.STORM_ID),
    +            topology,
    +            workerData.getTopologyConf(),
    +            workerData.getTaskToComponent(),
    +            workerData.getComponentToSortedTasks(),
    +            workerData.getComponentToStreamToFields(),
    +            workerData.getTopologyId(),
                     ConfigUtils.supervisorStormResourcesPath(
    -                        ConfigUtils.supervisorStormDistRoot(conf, (String) workerData.get(Constants.STORM_ID))),
    -                ConfigUtils.workerPidsRoot(conf, (String) workerData.get(Constants.WORKER_ID)),
    +                        ConfigUtils.supervisorStormDistRoot(conf, workerData.getTopologyId())),
    +                ConfigUtils.workerPidsRoot(conf, workerData.getWorkerId()),
                     taskId,
    -                (Integer) workerData.get(Constants.PORT),
    -                (List<Integer>) workerData.get(Constants.TASK_IDS),
    -                (Map<String, Object>) workerData.get(Constants.DEFAULT_SHARED_RESOURCES),
    -                (Map<String, Object>) workerData.get(Constants.USER_SHARED_RESOURCES),
    +                workerData.getPort(), workerData.getTaskIds(),
    +                workerData.getDefaultSharedResources(),
    +               workerData.getUserSharedResources(),
    --- End diff --
    
    nit: indentation is off


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #1756: STORM-1278: Port org.apache.storm.daemon.worker to java

Posted by abhishekagarwal87 <gi...@git.apache.org>.
Github user abhishekagarwal87 commented on the issue:

    https://github.com/apache/storm/pull/1756
  
    I dont mind. Sorry about this. But it will take me sometime before I get back to the PR.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1756: STORM-1278: Port org.apache.storm.daemon.worker to...

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1756#discussion_r87191290
  
    --- Diff: storm-core/src/jvm/org/apache/storm/daemon/worker/Worker.java ---
    @@ -0,0 +1,426 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + * <p>
    + * http://www.apache.org/licenses/LICENSE-2.0
    + * <p>
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.storm.daemon.worker;
    +
    +import com.google.common.base.Preconditions;
    +import com.lmax.disruptor.EventHandler;
    +import org.apache.commons.io.FileUtils;
    +import org.apache.commons.lang.ObjectUtils;
    +import org.apache.storm.Config;
    +import org.apache.storm.cluster.*;
    +import org.apache.storm.daemon.DaemonCommon;
    +import org.apache.storm.daemon.Shutdownable;
    +import org.apache.storm.daemon.StormCommon;
    +import org.apache.storm.executor.Executor;
    +import org.apache.storm.executor.ExecutorShutdown;
    +import org.apache.storm.executor.IRunningExecutor;
    +import org.apache.storm.executor.LocalExecutor;
    +import org.apache.storm.generated.*;
    +import org.apache.storm.messaging.IConnection;
    +import org.apache.storm.messaging.IContext;
    +import org.apache.storm.messaging.TaskMessage;
    +import org.apache.storm.security.auth.AuthUtils;
    +import org.apache.storm.security.auth.IAutoCredentials;
    +import org.apache.storm.stats.StatsUtil;
    +import org.apache.storm.utils.*;
    +import org.apache.zookeeper.data.ACL;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +import uk.org.lidalia.sysoutslf4j.context.SysOutOverSLF4J;
    +
    +import javax.security.auth.Subject;
    +import java.io.File;
    +import java.io.IOException;
    +import java.nio.charset.Charset;
    +import java.security.PrivilegedExceptionAction;
    +import java.util.*;
    +import java.util.concurrent.atomic.AtomicReference;
    +import java.util.function.Function;
    +import java.util.stream.Collectors;
    +
    +public class Worker implements Shutdownable, DaemonCommon {
    +
    +    private static final Logger LOG = LoggerFactory.getLogger(Worker.class);
    +    private final Map conf;
    +    private final IContext context;
    +    private final String topologyId;
    +    private final String assignmentId;
    +    private final int port;
    +    private final String workerId;
    +    private final LogConfigManager logConfigManager;
    +
    +
    +    private WorkerState workerState;
    +    private AtomicReference<List<IRunningExecutor>> executorsAtom;
    +    private Thread transferThread;
    +    private WorkerBackpressureThread backpressureThread;
    +
    +    private AtomicReference<Credentials> credentialsAtom;
    +    private Subject subject;
    +    private Collection<IAutoCredentials> autoCreds;
    +
    +
    +    /**
    +     * TODO: should worker even take the topologyId as input? this should be
    +     * deducable from cluster state (by searching through assignments)
    +     * what about if there's inconsistency in assignments? -> but nimbus should guarantee this consistency
    +     *
    +     * @param conf         - Storm configuration
    +     * @param context      -
    +     * @param topologyId   - topology id
    +     * @param assignmentId - assignement id
    +     * @param port         - port on which the worker runs
    +     * @param workerId     - worker id
    +     */
    +
    +    public Worker(Map conf, IContext context, String topologyId, String assignmentId, int port, String workerId) {
    +        this.conf = conf;
    +        this.context = context;
    +        this.topologyId = topologyId;
    +        this.assignmentId = assignmentId;
    +        this.port = port;
    +        this.workerId = workerId;
    +        this.logConfigManager = new LogConfigManager();
    +    }
    +
    +    public void start() throws Exception {
    +        LOG.info("Launching worker for {} on {}:{} with id {} and conf {}", topologyId, assignmentId, port, workerId,
    +            conf);
    +        // because in local mode, its not a separate
    +        // process. supervisor will register it in this case
    +        // if ConfigUtils.isLocalMode(conf) returns false then it is in distributed mode.
    +        if (!ConfigUtils.isLocalMode(conf)) {
    +            // Distributed mode
    +            SysOutOverSLF4J.sendSystemOutAndErrToSLF4J();
    +            String pid = Utils.processPid();
    +            FileUtils.touch(new File(ConfigUtils.workerPidPath(conf, workerId, pid)));
    +            FileUtils.writeStringToFile(new File(ConfigUtils.workerArtifactsPidPath(conf, topologyId, port)), pid,
    +                Charset.forName("UTF-8"));
    +        }
    +        final Map topologyConf =
    +            ConfigUtils.overrideLoginConfigWithSystemProperty(ConfigUtils.readSupervisorStormConf(conf, topologyId));
    +        List<ACL> acls = Utils.getWorkerACL(topologyConf);
    +        IStateStorage stateStorage =
    +            ClusterUtils.mkStateStorage(conf, topologyConf, acls, new ClusterStateContext(DaemonType.WORKER));
    +        IStormClusterState stormClusterState =
    +            ClusterUtils.mkStormClusterState(stateStorage, acls, new ClusterStateContext());
    +        Credentials initialCredentials = stormClusterState.credentials(topologyId, null);
    +        autoCreds = AuthUtils.GetAutoCredentials(topologyConf);
    +        subject = AuthUtils.populateSubject(null, autoCreds, initialCredentials.get_creds());
    +
    +        Subject.doAs(subject, new PrivilegedExceptionAction<Object>() {
    +            @Override public Object run() throws Exception {
    +                workerState =
    +                    new WorkerState(conf, context, topologyId, assignmentId, port, workerId, topologyConf, stateStorage,
    +                        stormClusterState);
    +
    +                // Heartbeat here so that worker process dies if this fails
    +                // it's important that worker heartbeat to supervisor ASAP so that supervisor knows
    +                // that worker is running and moves on
    +                doHeartBeat();
    +
    +                executorsAtom = new AtomicReference<>(null);
    +
    +                // launch heartbeat threads immediately so that slow-loading tasks don't cause the worker to timeout
    +                // to the supervisor
    +                workerState.heartbeatTimer
    +                    .scheduleRecurring(0, (Integer) conf.get(Config.WORKER_HEARTBEAT_FREQUENCY_SECS), () -> {
    +                        try {
    +                            doHeartBeat();
    +                        } catch (IOException e) {
    +                            throw new RuntimeException(e);
    +                        }
    +                    });
    +
    +                workerState.executorHeartbeatTimer
    +                    .scheduleRecurring(0, (Integer) conf.get(Config.WORKER_HEARTBEAT_FREQUENCY_SECS),
    +                        Worker.this::doExecutorHeartbeats);
    +
    +                workerState.registerCallbacks();
    +
    +                workerState.refreshConnections(null);
    +
    +                workerState.activateWorkerWhenAllConnectionsReady();
    +
    +                workerState.refreshStormActive(null);
    +
    +                workerState.runWorkerStartHooks();
    +
    +                List<IRunningExecutor> newExecutors = new ArrayList<IRunningExecutor>();
    +                for (List<Long> e : workerState.getExecutors()) {
    +                    if (ConfigUtils.isLocalMode(topologyConf)) {
    +                        newExecutors.add(
    +                            LocalExecutor.mkExecutor(workerState, e, initialCredentials.get_creds())
    +                                .execute());
    +                    } else {
    +                        newExecutors.add(
    +                            Executor.mkExecutor(workerState, e, initialCredentials.get_creds())
    +                                .execute());
    +                    }
    +                }
    +                executorsAtom.set(newExecutors);
    +
    +                EventHandler<Object> tupleHandler = (packets, seqId, batchEnd) -> workerState
    +                    .sendTuplesToRemoteWorker((HashMap<Integer, ArrayList<TaskMessage>>) packets, seqId, batchEnd);
    +
    +                // This thread will publish the messages destined for remote tasks to remote connections
    +                transferThread = Utils.asyncLoop(() -> {
    +                    workerState.transferQueue.consumeBatchWhenAvailable(tupleHandler);
    +                    return 0L;
    +                });
    +
    +                DisruptorBackpressureCallback disruptorBackpressureHandler =
    +                    mkDisruptorBackpressureHandler(workerState);
    +                workerState.transferQueue.registerBackpressureCallback(disruptorBackpressureHandler);
    +                workerState.transferQueue
    +                    .setEnableBackpressure((Boolean) topologyConf.get(Config.TOPOLOGY_BACKPRESSURE_ENABLE));
    +                workerState.transferQueue
    +                    .setHighWaterMark(Utils.getDouble(topologyConf.get(Config.BACKPRESSURE_DISRUPTOR_HIGH_WATERMARK)));
    +                workerState.transferQueue
    +                    .setLowWaterMark(Utils.getDouble(topologyConf.get(Config.BACKPRESSURE_DISRUPTOR_LOW_WATERMARK)));
    +
    +                WorkerBackpressureCallback backpressureCallback = mkBackpressureHandler();
    +                backpressureThread = new WorkerBackpressureThread(workerState.backpressureTrigger, workerState, backpressureCallback);
    +                if ((Boolean) topologyConf.get(Config.TOPOLOGY_BACKPRESSURE_ENABLE)) {
    +                    backpressureThread.start();
    +                    stormClusterState.topologyBackpressure(topologyId, workerState::refreshThrottle);
    +                }
    +
    +                credentialsAtom = new AtomicReference<Credentials>(initialCredentials);
    +
    +                establishLogSettingCallback();
    +
    +                workerState.stormClusterState.credentials(topologyId, Worker.this::checkCredentialsChanged);
    +
    +                workerState.refreshCredentialsTimer.scheduleRecurring(0,
    +                    (Integer) conf.get(Config.TASK_CREDENTIALS_POLL_SECS), new Runnable() {
    +                        @Override public void run() {
    +                            checkCredentialsChanged();
    +                            if ((Boolean) topologyConf.get(Config.TOPOLOGY_BACKPRESSURE_ENABLE)) {
    +                               checkThrottleChanged();
    +                            }
    +                        }
    +                    });
    +
    +                // The jitter allows the clients to get the data at different times, and avoids thundering herd
    +                if (!(Boolean) topologyConf.get(Config.TOPOLOGY_DISABLE_LOADAWARE_MESSAGING)) {
    +                    workerState.refreshLoadTimer.scheduleRecurringWithJitter(0, 1, 500, workerState::refreshLoad);
    +                }
    +
    +                workerState.refreshConnectionsTimer.scheduleRecurring(0,
    +                    (Integer) conf.get(Config.TASK_REFRESH_POLL_SECS), workerState::refreshConnections);
    +
    +                workerState.resetLogTevelsTimer.scheduleRecurring(0,
    +                    (Integer) conf.get(Config.WORKER_LOG_LEVEL_RESET_POLL_SECS), logConfigManager::resetLogLevels);
    +
    +                workerState.refreshActiveTimer.scheduleRecurring(0, (Integer) conf.get(Config.TASK_REFRESH_POLL_SECS),
    +                    workerState::refreshStormActive);
    +
    +                LOG.info("Worker has topology config {}", Utils.redactValue(topologyConf, Config.STORM_ZOOKEEPER_TOPOLOGY_AUTH_PAYLOAD));
    +                LOG.info("Worker {} for storm {} on {}:{}  has finished loading", workerId, topologyId, assignmentId, port);
    +                return this;
    +            };
    +        });
    +
    +    }
    +
    +    public void doHeartBeat() throws IOException {
    +        LocalState state = ConfigUtils.workerState(workerState.conf, workerState.workerId);
    +        state.setWorkerHeartBeat(new LSWorkerHeartbeat(Time.currentTimeSecs(), workerState.topologyId,
    +            workerState.executors.stream()
    +                .map(executor -> new ExecutorInfo(executor.get(0).intValue(), executor.get(1).intValue()))
    +                .collect(Collectors.toList()), workerState.port));
    +        state.cleanup(60); // this is just in case supervisor is down so that disk doesn't fill up.
    +        // it shouldn't take supervisor 120 seconds between listing dir and reading it
    +    }
    +
    +    public void doExecutorHeartbeats() {
    +        Map<List<Integer>, ExecutorStats> stats;
    +        List<IRunningExecutor> executors = this.executorsAtom.get();
    +        if (null == executors) {
    +            stats = StatsUtil.mkEmptyExecutorZkHbs(workerState.executors);
    +        } else {
    +            stats = StatsUtil.convertExecutorZkHbs(executors.stream().collect(Collectors
    +                .toMap((Function<IRunningExecutor, List<Long>>) IRunningExecutor::getExecutorId,
    +                    (Function<IRunningExecutor, ExecutorStats>) IRunningExecutor::renderStats)));
    +        }
    +        Map<String, Object> zkHB = StatsUtil.mkZkWorkerHb(workerState.topologyId, stats, workerState.uptime.upTime());
    +        try {
    +            workerState.stormClusterState
    +                .workerHeartbeat(workerState.topologyId, workerState.assignmentId, (long) workerState.port,
    +                    StatsUtil.thriftifyZkWorkerHb(zkHB));
    +        } catch (Exception ex) {
    +            LOG.error("Worker failed to write heartbeats to ZK or Pacemaker...will retry", ex);
    +        }
    +    }
    +
    +    public void checkCredentialsChanged() {
    +        Credentials newCreds = workerState.stormClusterState.credentials(topologyId, null);
    +        if (! ObjectUtils.equals(newCreds, credentialsAtom.get())) {
    +            // This does not have to be atomic, worst case we update when one is not needed
    +            AuthUtils.updateSubject(subject, autoCreds, (null == newCreds) ? null : newCreds.get_creds());
    +            for (IRunningExecutor executor : executorsAtom.get()) {
    +                executor.credenetialsChanged(newCreds);
    +            }
    +            credentialsAtom.set(newCreds);
    +        }
    +    }
    +
    +    public void checkThrottleChanged() {
    +        boolean throttleOn = workerState.stormClusterState.topologyBackpressure(topologyId, this::checkThrottleChanged);
    +        workerState.throttleOn.set(throttleOn);
    +    }
    +
    +    public void checkLogConfigChanged() {
    +        LogConfig logConfig = workerState.stormClusterState.topologyLogConfig(topologyId, null);
    +        logConfigManager.processLogConfigChange(logConfig);
    +        establishLogSettingCallback();
    +    }
    +
    +    public void establishLogSettingCallback() {
    +        workerState.stormClusterState.topologyLogConfig(topologyId, this::checkLogConfigChanged);
    +    }
    +
    +
    +    /**
    +     * make a handler for the worker's send disruptor queue to
    +     * check highWaterMark and lowWaterMark for backpressure
    +     */
    +    private DisruptorBackpressureCallback mkDisruptorBackpressureHandler(WorkerState workerState) {
    +        return new DisruptorBackpressureCallback() {
    +            @Override public void highWaterMark() throws Exception {
    +                workerState.transferBackpressure.set(true);
    +                WorkerBackpressureThread.notifyBackpressureChecker(workerState.backpressureTrigger);
    +            }
    +
    +            @Override public void lowWaterMark() throws Exception {
    +                workerState.transferBackpressure.set(false);
    --- End diff --
    
    Same above.
    
    ```
    -      (log-debug "worker " (:worker-id worker) " transfer-queue is not congested, set backpressure flag false")
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #1756: STORM-1278: Port org.apache.storm.daemon.worker to java

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on the issue:

    https://github.com/apache/storm/pull/1756
  
    Looks great I am +1 even without the last comment I made (That code can go away when it is translated to java).
    
    But since I also contributed some of the test code changes I really would like someone else to have a look too.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1756: STORM-1278: Port org.apache.storm.daemon.worker to...

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1756#discussion_r87902202
  
    --- Diff: storm-core/src/jvm/org/apache/storm/daemon/worker/Worker.java ---
    @@ -0,0 +1,426 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + * <p>
    + * http://www.apache.org/licenses/LICENSE-2.0
    + * <p>
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.storm.daemon.worker;
    +
    +import com.google.common.base.Preconditions;
    +import com.lmax.disruptor.EventHandler;
    +import org.apache.commons.io.FileUtils;
    +import org.apache.commons.lang.ObjectUtils;
    +import org.apache.storm.Config;
    +import org.apache.storm.cluster.*;
    +import org.apache.storm.daemon.DaemonCommon;
    +import org.apache.storm.daemon.Shutdownable;
    +import org.apache.storm.daemon.StormCommon;
    +import org.apache.storm.executor.Executor;
    +import org.apache.storm.executor.ExecutorShutdown;
    +import org.apache.storm.executor.IRunningExecutor;
    +import org.apache.storm.executor.LocalExecutor;
    +import org.apache.storm.generated.*;
    +import org.apache.storm.messaging.IConnection;
    +import org.apache.storm.messaging.IContext;
    +import org.apache.storm.messaging.TaskMessage;
    +import org.apache.storm.security.auth.AuthUtils;
    +import org.apache.storm.security.auth.IAutoCredentials;
    +import org.apache.storm.stats.StatsUtil;
    +import org.apache.storm.utils.*;
    +import org.apache.zookeeper.data.ACL;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +import uk.org.lidalia.sysoutslf4j.context.SysOutOverSLF4J;
    +
    +import javax.security.auth.Subject;
    +import java.io.File;
    +import java.io.IOException;
    +import java.nio.charset.Charset;
    +import java.security.PrivilegedExceptionAction;
    +import java.util.*;
    +import java.util.concurrent.atomic.AtomicReference;
    +import java.util.function.Function;
    +import java.util.stream.Collectors;
    +
    +public class Worker implements Shutdownable, DaemonCommon {
    +
    +    private static final Logger LOG = LoggerFactory.getLogger(Worker.class);
    +    private final Map conf;
    +    private final IContext context;
    +    private final String topologyId;
    +    private final String assignmentId;
    +    private final int port;
    +    private final String workerId;
    +    private final LogConfigManager logConfigManager;
    +
    +
    +    private WorkerState workerState;
    +    private AtomicReference<List<IRunningExecutor>> executorsAtom;
    +    private Thread transferThread;
    +    private WorkerBackpressureThread backpressureThread;
    +
    +    private AtomicReference<Credentials> credentialsAtom;
    +    private Subject subject;
    +    private Collection<IAutoCredentials> autoCreds;
    +
    +
    +    /**
    +     * TODO: should worker even take the topologyId as input? this should be
    +     * deducable from cluster state (by searching through assignments)
    +     * what about if there's inconsistency in assignments? -> but nimbus should guarantee this consistency
    +     *
    +     * @param conf         - Storm configuration
    +     * @param context      -
    +     * @param topologyId   - topology id
    +     * @param assignmentId - assignement id
    +     * @param port         - port on which the worker runs
    +     * @param workerId     - worker id
    +     */
    +
    +    public Worker(Map conf, IContext context, String topologyId, String assignmentId, int port, String workerId) {
    +        this.conf = conf;
    +        this.context = context;
    +        this.topologyId = topologyId;
    +        this.assignmentId = assignmentId;
    +        this.port = port;
    +        this.workerId = workerId;
    +        this.logConfigManager = new LogConfigManager();
    +    }
    +
    +    public void start() throws Exception {
    +        LOG.info("Launching worker for {} on {}:{} with id {} and conf {}", topologyId, assignmentId, port, workerId,
    +            conf);
    +        // because in local mode, its not a separate
    +        // process. supervisor will register it in this case
    +        // if ConfigUtils.isLocalMode(conf) returns false then it is in distributed mode.
    +        if (!ConfigUtils.isLocalMode(conf)) {
    +            // Distributed mode
    +            SysOutOverSLF4J.sendSystemOutAndErrToSLF4J();
    +            String pid = Utils.processPid();
    +            FileUtils.touch(new File(ConfigUtils.workerPidPath(conf, workerId, pid)));
    +            FileUtils.writeStringToFile(new File(ConfigUtils.workerArtifactsPidPath(conf, topologyId, port)), pid,
    +                Charset.forName("UTF-8"));
    +        }
    +        final Map topologyConf =
    +            ConfigUtils.overrideLoginConfigWithSystemProperty(ConfigUtils.readSupervisorStormConf(conf, topologyId));
    +        List<ACL> acls = Utils.getWorkerACL(topologyConf);
    +        IStateStorage stateStorage =
    +            ClusterUtils.mkStateStorage(conf, topologyConf, acls, new ClusterStateContext(DaemonType.WORKER));
    +        IStormClusterState stormClusterState =
    +            ClusterUtils.mkStormClusterState(stateStorage, acls, new ClusterStateContext());
    +        Credentials initialCredentials = stormClusterState.credentials(topologyId, null);
    +        autoCreds = AuthUtils.GetAutoCredentials(topologyConf);
    +        subject = AuthUtils.populateSubject(null, autoCreds, initialCredentials.get_creds());
    +
    +        Subject.doAs(subject, new PrivilegedExceptionAction<Object>() {
    +            @Override public Object run() throws Exception {
    +                workerState =
    +                    new WorkerState(conf, context, topologyId, assignmentId, port, workerId, topologyConf, stateStorage,
    +                        stormClusterState);
    +
    +                // Heartbeat here so that worker process dies if this fails
    +                // it's important that worker heartbeat to supervisor ASAP so that supervisor knows
    +                // that worker is running and moves on
    +                doHeartBeat();
    +
    +                executorsAtom = new AtomicReference<>(null);
    +
    +                // launch heartbeat threads immediately so that slow-loading tasks don't cause the worker to timeout
    +                // to the supervisor
    +                workerState.heartbeatTimer
    +                    .scheduleRecurring(0, (Integer) conf.get(Config.WORKER_HEARTBEAT_FREQUENCY_SECS), () -> {
    +                        try {
    +                            doHeartBeat();
    +                        } catch (IOException e) {
    +                            throw new RuntimeException(e);
    +                        }
    +                    });
    +
    +                workerState.executorHeartbeatTimer
    +                    .scheduleRecurring(0, (Integer) conf.get(Config.WORKER_HEARTBEAT_FREQUENCY_SECS),
    +                        Worker.this::doExecutorHeartbeats);
    +
    +                workerState.registerCallbacks();
    +
    +                workerState.refreshConnections(null);
    +
    +                workerState.activateWorkerWhenAllConnectionsReady();
    +
    +                workerState.refreshStormActive(null);
    +
    +                workerState.runWorkerStartHooks();
    +
    +                List<IRunningExecutor> newExecutors = new ArrayList<IRunningExecutor>();
    +                for (List<Long> e : workerState.getExecutors()) {
    +                    if (ConfigUtils.isLocalMode(topologyConf)) {
    +                        newExecutors.add(
    +                            LocalExecutor.mkExecutor(workerState, e, initialCredentials.get_creds())
    +                                .execute());
    +                    } else {
    +                        newExecutors.add(
    +                            Executor.mkExecutor(workerState, e, initialCredentials.get_creds())
    +                                .execute());
    +                    }
    +                }
    +                executorsAtom.set(newExecutors);
    +
    +                EventHandler<Object> tupleHandler = (packets, seqId, batchEnd) -> workerState
    +                    .sendTuplesToRemoteWorker((HashMap<Integer, ArrayList<TaskMessage>>) packets, seqId, batchEnd);
    +
    +                // This thread will publish the messages destined for remote tasks to remote connections
    +                transferThread = Utils.asyncLoop(() -> {
    +                    workerState.transferQueue.consumeBatchWhenAvailable(tupleHandler);
    +                    return 0L;
    +                });
    +
    +                DisruptorBackpressureCallback disruptorBackpressureHandler =
    +                    mkDisruptorBackpressureHandler(workerState);
    +                workerState.transferQueue.registerBackpressureCallback(disruptorBackpressureHandler);
    +                workerState.transferQueue
    +                    .setEnableBackpressure((Boolean) topologyConf.get(Config.TOPOLOGY_BACKPRESSURE_ENABLE));
    +                workerState.transferQueue
    +                    .setHighWaterMark(Utils.getDouble(topologyConf.get(Config.BACKPRESSURE_DISRUPTOR_HIGH_WATERMARK)));
    +                workerState.transferQueue
    +                    .setLowWaterMark(Utils.getDouble(topologyConf.get(Config.BACKPRESSURE_DISRUPTOR_LOW_WATERMARK)));
    +
    +                WorkerBackpressureCallback backpressureCallback = mkBackpressureHandler();
    +                backpressureThread = new WorkerBackpressureThread(workerState.backpressureTrigger, workerState, backpressureCallback);
    +                if ((Boolean) topologyConf.get(Config.TOPOLOGY_BACKPRESSURE_ENABLE)) {
    +                    backpressureThread.start();
    +                    stormClusterState.topologyBackpressure(topologyId, workerState::refreshThrottle);
    +                }
    +
    +                credentialsAtom = new AtomicReference<Credentials>(initialCredentials);
    +
    +                establishLogSettingCallback();
    +
    +                workerState.stormClusterState.credentials(topologyId, Worker.this::checkCredentialsChanged);
    +
    +                workerState.refreshCredentialsTimer.scheduleRecurring(0,
    +                    (Integer) conf.get(Config.TASK_CREDENTIALS_POLL_SECS), new Runnable() {
    +                        @Override public void run() {
    +                            checkCredentialsChanged();
    +                            if ((Boolean) topologyConf.get(Config.TOPOLOGY_BACKPRESSURE_ENABLE)) {
    +                               checkThrottleChanged();
    +                            }
    +                        }
    +                    });
    +
    +                // The jitter allows the clients to get the data at different times, and avoids thundering herd
    +                if (!(Boolean) topologyConf.get(Config.TOPOLOGY_DISABLE_LOADAWARE_MESSAGING)) {
    +                    workerState.refreshLoadTimer.scheduleRecurringWithJitter(0, 1, 500, workerState::refreshLoad);
    +                }
    +
    +                workerState.refreshConnectionsTimer.scheduleRecurring(0,
    +                    (Integer) conf.get(Config.TASK_REFRESH_POLL_SECS), workerState::refreshConnections);
    +
    +                workerState.resetLogTevelsTimer.scheduleRecurring(0,
    +                    (Integer) conf.get(Config.WORKER_LOG_LEVEL_RESET_POLL_SECS), logConfigManager::resetLogLevels);
    +
    +                workerState.refreshActiveTimer.scheduleRecurring(0, (Integer) conf.get(Config.TASK_REFRESH_POLL_SECS),
    +                    workerState::refreshStormActive);
    +
    +                LOG.info("Worker has topology config {}", Utils.redactValue(topologyConf, Config.STORM_ZOOKEEPER_TOPOLOGY_AUTH_PAYLOAD));
    +                LOG.info("Worker {} for storm {} on {}:{}  has finished loading", workerId, topologyId, assignmentId, port);
    +                return this;
    +            };
    +        });
    +
    +    }
    +
    +    public void doHeartBeat() throws IOException {
    +        LocalState state = ConfigUtils.workerState(workerState.conf, workerState.workerId);
    +        state.setWorkerHeartBeat(new LSWorkerHeartbeat(Time.currentTimeSecs(), workerState.topologyId,
    +            workerState.executors.stream()
    +                .map(executor -> new ExecutorInfo(executor.get(0).intValue(), executor.get(1).intValue()))
    +                .collect(Collectors.toList()), workerState.port));
    +        state.cleanup(60); // this is just in case supervisor is down so that disk doesn't fill up.
    +        // it shouldn't take supervisor 120 seconds between listing dir and reading it
    +    }
    +
    +    public void doExecutorHeartbeats() {
    +        Map<List<Integer>, ExecutorStats> stats;
    +        List<IRunningExecutor> executors = this.executorsAtom.get();
    +        if (null == executors) {
    +            stats = StatsUtil.mkEmptyExecutorZkHbs(workerState.executors);
    +        } else {
    +            stats = StatsUtil.convertExecutorZkHbs(executors.stream().collect(Collectors
    +                .toMap((Function<IRunningExecutor, List<Long>>) IRunningExecutor::getExecutorId,
    +                    (Function<IRunningExecutor, ExecutorStats>) IRunningExecutor::renderStats)));
    +        }
    +        Map<String, Object> zkHB = StatsUtil.mkZkWorkerHb(workerState.topologyId, stats, workerState.uptime.upTime());
    +        try {
    +            workerState.stormClusterState
    +                .workerHeartbeat(workerState.topologyId, workerState.assignmentId, (long) workerState.port,
    +                    StatsUtil.thriftifyZkWorkerHb(zkHB));
    +        } catch (Exception ex) {
    +            LOG.error("Worker failed to write heartbeats to ZK or Pacemaker...will retry", ex);
    +        }
    +    }
    +
    +    public void checkCredentialsChanged() {
    +        Credentials newCreds = workerState.stormClusterState.credentials(topologyId, null);
    +        if (! ObjectUtils.equals(newCreds, credentialsAtom.get())) {
    +            // This does not have to be atomic, worst case we update when one is not needed
    +            AuthUtils.updateSubject(subject, autoCreds, (null == newCreds) ? null : newCreds.get_creds());
    +            for (IRunningExecutor executor : executorsAtom.get()) {
    +                executor.credenetialsChanged(newCreds);
    +            }
    +            credentialsAtom.set(newCreds);
    +        }
    +    }
    +
    +    public void checkThrottleChanged() {
    +        boolean throttleOn = workerState.stormClusterState.topologyBackpressure(topologyId, this::checkThrottleChanged);
    +        workerState.throttleOn.set(throttleOn);
    +    }
    +
    +    public void checkLogConfigChanged() {
    +        LogConfig logConfig = workerState.stormClusterState.topologyLogConfig(topologyId, null);
    +        logConfigManager.processLogConfigChange(logConfig);
    +        establishLogSettingCallback();
    +    }
    +
    +    public void establishLogSettingCallback() {
    +        workerState.stormClusterState.topologyLogConfig(topologyId, this::checkLogConfigChanged);
    +    }
    +
    +
    +    /**
    +     * make a handler for the worker's send disruptor queue to
    +     * check highWaterMark and lowWaterMark for backpressure
    +     */
    +    private DisruptorBackpressureCallback mkDisruptorBackpressureHandler(WorkerState workerState) {
    +        return new DisruptorBackpressureCallback() {
    +            @Override public void highWaterMark() throws Exception {
    +                workerState.transferBackpressure.set(true);
    --- End diff --
    
    You are correct that was a bug that was fixed, but the logs were not fixed.  I'll update them to be correct.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1756: STORM-1278: Port org.apache.storm.daemon.worker to...

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1756#discussion_r87337480
  
    --- Diff: storm-core/src/jvm/org/apache/storm/utils/Utils.java ---
    @@ -123,6 +123,7 @@
     import java.util.TreeMap;
     import java.util.UUID;
     import java.util.concurrent.Callable;
    +import java.util.concurrent.atomic.AtomicReference;
    --- End diff --
    
    Nit: new import without actual usage.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #1756: STORM-1278: Port org.apache.storm.daemon.worker to java

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on the issue:

    https://github.com/apache/storm/pull/1756
  
    Perf numbers look good compared to the 1.x line (very non scientific though).  I didn't dig into it a lot.  Running throughput vs latency on my mac I saw the CPU utilization with this is about half of what it is on the 1.x line at 20k sentences per second.  Also the latencies all across the board are lower, not by a lot but a few ms.  Also the JITTER seems to be doing a lot better job on the java code and it was able to hit a steady state at 20k/sec in abut 30 seconds where as the clojure code was taking almost 3 mins to hit a steady state.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1756: STORM-1278: Port org.apache.storm.daemon.worker to...

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1756#discussion_r86178116
  
    --- Diff: storm-core/src/jvm/org/apache/storm/daemon/worker/LogConfigManager.java ---
    @@ -0,0 +1,154 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + * <p>
    + * http://www.apache.org/licenses/LICENSE-2.0
    + * <p>
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.storm.daemon.worker;
    +
    +import org.apache.logging.log4j.Level;
    +import org.apache.logging.log4j.LogManager;
    +import org.apache.logging.log4j.core.LoggerContext;
    +import org.apache.logging.log4j.core.config.Configuration;
    +import org.apache.logging.log4j.core.config.LoggerConfig;
    +import org.apache.storm.generated.LogConfig;
    +import org.apache.storm.generated.LogLevel;
    +import org.apache.storm.generated.LogLevelAction;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.util.HashMap;
    +import java.util.Map;
    +import java.util.TreeMap;
    +import java.util.TreeSet;
    +import java.util.concurrent.atomic.AtomicReference;
    +
    +public class LogConfigManager {
    +
    +    private static final Logger LOG = LoggerFactory.getLogger(LogConfigManager.class);
    +
    +    private final AtomicReference<TreeMap<String, LogLevel>> latestLogConfig;
    +    private final Map<String, Level> originalLogLevels;
    +
    +    public LogConfigManager() {
    +        this(new AtomicReference<>(new TreeMap<>()));
    +    }
    +
    +    public LogConfigManager(AtomicReference<TreeMap<String, LogLevel>> latestLogConfig) {
    +        this.latestLogConfig = latestLogConfig;
    +        this.originalLogLevels = getLoggerLevels();
    +        LOG.info("Started with log levels: {}", originalLogLevels);
    +    }
    +
    +    public void processLogConfigChange(LogConfig logConfig) {
    +        if (null != logConfig) {
    +            LOG.debug("Processing received log config: {}", logConfig);
    +            TreeMap<String, LogLevel> loggers = new TreeMap<>(logConfig.get_named_logger_level());
    +            LoggerContext logContext = (LoggerContext) LogManager.getContext();
    --- End diff --
    
    In the previous code it passed in a false.  Not sure if it makes much difference in our case though.
    
    https://logging.apache.org/log4j/2.x/log4j-api/apidocs/org/apache/logging/log4j/LogManager.html#getContext(boolean)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1756: STORM-1278: Port org.apache.storm.daemon.worker to...

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1756#discussion_r86166475
  
    --- Diff: storm-core/src/jvm/org/apache/storm/daemon/worker/Worker.java ---
    @@ -0,0 +1,426 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + * <p>
    + * http://www.apache.org/licenses/LICENSE-2.0
    + * <p>
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.storm.daemon.worker;
    +
    +import com.google.common.base.Preconditions;
    +import com.lmax.disruptor.EventHandler;
    +import org.apache.commons.io.FileUtils;
    +import org.apache.commons.lang.ObjectUtils;
    +import org.apache.storm.Config;
    +import org.apache.storm.cluster.*;
    +import org.apache.storm.daemon.DaemonCommon;
    +import org.apache.storm.daemon.Shutdownable;
    +import org.apache.storm.daemon.StormCommon;
    +import org.apache.storm.executor.Executor;
    +import org.apache.storm.executor.ExecutorShutdown;
    +import org.apache.storm.executor.IRunningExecutor;
    +import org.apache.storm.executor.LocalExecutor;
    +import org.apache.storm.generated.*;
    +import org.apache.storm.messaging.IConnection;
    +import org.apache.storm.messaging.IContext;
    +import org.apache.storm.messaging.TaskMessage;
    +import org.apache.storm.security.auth.AuthUtils;
    +import org.apache.storm.security.auth.IAutoCredentials;
    +import org.apache.storm.stats.StatsUtil;
    +import org.apache.storm.utils.*;
    +import org.apache.zookeeper.data.ACL;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +import uk.org.lidalia.sysoutslf4j.context.SysOutOverSLF4J;
    +
    +import javax.security.auth.Subject;
    +import java.io.File;
    +import java.io.IOException;
    +import java.nio.charset.Charset;
    +import java.security.PrivilegedExceptionAction;
    +import java.util.*;
    +import java.util.concurrent.atomic.AtomicReference;
    +import java.util.function.Function;
    +import java.util.stream.Collectors;
    +
    +public class Worker implements Shutdownable, DaemonCommon {
    +
    +    private static final Logger LOG = LoggerFactory.getLogger(Worker.class);
    +    private final Map conf;
    +    private final IContext context;
    +    private final String topologyId;
    +    private final String assignmentId;
    +    private final int port;
    +    private final String workerId;
    +    private final LogConfigManager logConfigManager;
    +
    +
    +    private WorkerState workerState;
    +    private AtomicReference<List<IRunningExecutor>> executorsAtom;
    +    private Thread transferThread;
    +    private WorkerBackpressureThread backpressureThread;
    +
    +    private AtomicReference<Credentials> credentialsAtom;
    +    private Subject subject;
    +    private Collection<IAutoCredentials> autoCreds;
    +
    +
    +    /**
    +     * TODO: should worker even take the topologyId as input? this should be
    +     * deducable from cluster state (by searching through assignments)
    +     * what about if there's inconsistency in assignments? -> but nimbus should guarantee this consistency
    +     *
    +     * @param conf         - Storm configuration
    +     * @param context      -
    +     * @param topologyId   - topology id
    +     * @param assignmentId - assignement id
    +     * @param port         - port on which the worker runs
    +     * @param workerId     - worker id
    +     */
    +
    +    public Worker(Map conf, IContext context, String topologyId, String assignmentId, int port, String workerId) {
    +        this.conf = conf;
    +        this.context = context;
    +        this.topologyId = topologyId;
    +        this.assignmentId = assignmentId;
    +        this.port = port;
    +        this.workerId = workerId;
    +        this.logConfigManager = new LogConfigManager();
    +    }
    +
    +    public void start() throws Exception {
    +        LOG.info("Launching worker for {} on {}:{} with id {} and conf {}", topologyId, assignmentId, port, workerId,
    +            conf);
    +        // because in local mode, its not a separate
    +        // process. supervisor will register it in this case
    +        // if ConfigUtils.isLocalMode(conf) returns false then it is in distributed mode.
    +        if (!ConfigUtils.isLocalMode(conf)) {
    +            // Distributed mode
    +            SysOutOverSLF4J.sendSystemOutAndErrToSLF4J();
    +            String pid = Utils.processPid();
    +            FileUtils.touch(new File(ConfigUtils.workerPidPath(conf, workerId, pid)));
    +            FileUtils.writeStringToFile(new File(ConfigUtils.workerArtifactsPidPath(conf, topologyId, port)), pid,
    +                Charset.defaultCharset());
    --- End diff --
    
    Can we use UTF8 here instead of the default charset?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #1756: STORM-1278: Port org.apache.storm.daemon.worker to java

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on the issue:

    https://github.com/apache/storm/pull/1756
  
    @abhishekagarwal87 I created a pull request to your repo for the failing tests 
    
    https://github.com/abhishekagarwal87/storm/pull/7
    
    I will keep looking at the pull request


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #1756: STORM-1278: Port org.apache.storm.daemon.worker to java

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on the issue:

    https://github.com/apache/storm/pull/1756
  
    FYI I put up a separate pull request based off of this one at #1775 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1756: STORM-1278: Port org.apache.storm.daemon.worker to...

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1756#discussion_r87160199
  
    --- Diff: storm-core/src/jvm/org/apache/storm/Constants.java ---
    @@ -20,12 +20,15 @@
     import org.apache.storm.coordination.CoordinatedBolt;
     import clojure.lang.RT;
    --- End diff --
    
    Nitpick: We could remove RT now.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---