You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@storm.apache.org by revans2 <gi...@git.apache.org> on 2016/12/01 15:04:49 UTC

[GitHub] storm pull request #1744: STORM-1276: line for line translation of nimbus to...

Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1744#discussion_r90464031
  
    --- Diff: storm-core/src/jvm/org/apache/storm/daemon/nimbus/Nimbus.java ---
    @@ -0,0 +1,3729 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + * 
    + * http://www.apache.org/licenses/LICENSE-2.0
    + * 
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.storm.daemon.nimbus;
    +
    +import static org.apache.storm.metric.StormMetricsRegistry.registerMeter;
    +import static org.apache.storm.utils.Utils.OR;
    +
    +import java.io.File;
    +import java.io.FileInputStream;
    +import java.io.FileOutputStream;
    +import java.io.IOException;
    +import java.io.InterruptedIOException;
    +import java.io.OutputStream;
    +import java.net.BindException;
    +import java.net.ServerSocket;
    +import java.nio.ByteBuffer;
    +import java.nio.channels.Channels;
    +import java.nio.channels.WritableByteChannel;
    +import java.security.Principal;
    +import java.util.ArrayList;
    +import java.util.Arrays;
    +import java.util.Collection;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.Iterator;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Map.Entry;
    +import java.util.Set;
    +import java.util.concurrent.atomic.AtomicLong;
    +import java.util.concurrent.atomic.AtomicReference;
    +import java.util.function.UnaryOperator;
    +import java.util.regex.Matcher;
    +import java.util.regex.Pattern;
    +
    +import javax.security.auth.Subject;
    +
    +import org.apache.storm.Config;
    +import org.apache.storm.StormTimer;
    +import org.apache.storm.blobstore.AtomicOutputStream;
    +import org.apache.storm.blobstore.BlobStore;
    +import org.apache.storm.blobstore.BlobStoreAclHandler;
    +import org.apache.storm.blobstore.BlobSynchronizer;
    +import org.apache.storm.blobstore.InputStreamWithMeta;
    +import org.apache.storm.blobstore.KeySequenceNumber;
    +import org.apache.storm.blobstore.LocalFsBlobStore;
    +import org.apache.storm.cluster.ClusterStateContext;
    +import org.apache.storm.cluster.ClusterUtils;
    +import org.apache.storm.cluster.DaemonType;
    +import org.apache.storm.cluster.IStormClusterState;
    +import org.apache.storm.daemon.DaemonCommon;
    +import org.apache.storm.daemon.Shutdownable;
    +import org.apache.storm.daemon.StormCommon;
    +import org.apache.storm.generated.AlreadyAliveException;
    +import org.apache.storm.generated.Assignment;
    +import org.apache.storm.generated.AuthorizationException;
    +import org.apache.storm.generated.BeginDownloadResult;
    +import org.apache.storm.generated.ClusterSummary;
    +import org.apache.storm.generated.CommonAggregateStats;
    +import org.apache.storm.generated.ComponentAggregateStats;
    +import org.apache.storm.generated.ComponentPageInfo;
    +import org.apache.storm.generated.ComponentType;
    +import org.apache.storm.generated.Credentials;
    +import org.apache.storm.generated.DebugOptions;
    +import org.apache.storm.generated.ErrorInfo;
    +import org.apache.storm.generated.ExecutorInfo;
    +import org.apache.storm.generated.ExecutorStats;
    +import org.apache.storm.generated.ExecutorSummary;
    +import org.apache.storm.generated.GetInfoOptions;
    +import org.apache.storm.generated.InvalidTopologyException;
    +import org.apache.storm.generated.KeyAlreadyExistsException;
    +import org.apache.storm.generated.KeyNotFoundException;
    +import org.apache.storm.generated.KillOptions;
    +import org.apache.storm.generated.LSTopoHistory;
    +import org.apache.storm.generated.ListBlobsResult;
    +import org.apache.storm.generated.LogConfig;
    +import org.apache.storm.generated.LogLevel;
    +import org.apache.storm.generated.LogLevelAction;
    +import org.apache.storm.generated.Nimbus.Iface;
    +import org.apache.storm.generated.Nimbus.Processor;
    +import org.apache.storm.generated.NimbusSummary;
    +import org.apache.storm.generated.NodeInfo;
    +import org.apache.storm.generated.NotAliveException;
    +import org.apache.storm.generated.NumErrorsChoice;
    +import org.apache.storm.generated.ProfileAction;
    +import org.apache.storm.generated.ProfileRequest;
    +import org.apache.storm.generated.ReadableBlobMeta;
    +import org.apache.storm.generated.RebalanceOptions;
    +import org.apache.storm.generated.SettableBlobMeta;
    +import org.apache.storm.generated.StormBase;
    +import org.apache.storm.generated.StormTopology;
    +import org.apache.storm.generated.SubmitOptions;
    +import org.apache.storm.generated.SupervisorInfo;
    +import org.apache.storm.generated.SupervisorPageInfo;
    +import org.apache.storm.generated.SupervisorSummary;
    +import org.apache.storm.generated.TopologyActionOptions;
    +import org.apache.storm.generated.TopologyHistoryInfo;
    +import org.apache.storm.generated.TopologyInfo;
    +import org.apache.storm.generated.TopologyInitialStatus;
    +import org.apache.storm.generated.TopologyPageInfo;
    +import org.apache.storm.generated.TopologyStatus;
    +import org.apache.storm.generated.TopologySummary;
    +import org.apache.storm.generated.WorkerResources;
    +import org.apache.storm.generated.WorkerSummary;
    +import org.apache.storm.logging.ThriftAccessLogger;
    +import org.apache.storm.metric.ClusterMetricsConsumerExecutor;
    +import org.apache.storm.metric.StormMetricsRegistry;
    +import org.apache.storm.metric.api.DataPoint;
    +import org.apache.storm.metric.api.IClusterMetricsConsumer;
    +import org.apache.storm.metric.api.IClusterMetricsConsumer.ClusterInfo;
    +import org.apache.storm.nimbus.DefaultTopologyValidator;
    +import org.apache.storm.nimbus.ILeaderElector;
    +import org.apache.storm.nimbus.ITopologyActionNotifierPlugin;
    +import org.apache.storm.nimbus.ITopologyValidator;
    +import org.apache.storm.nimbus.NimbusInfo;
    +import org.apache.storm.scheduler.Cluster;
    +import org.apache.storm.scheduler.DefaultScheduler;
    +import org.apache.storm.scheduler.ExecutorDetails;
    +import org.apache.storm.scheduler.INimbus;
    +import org.apache.storm.scheduler.IScheduler;
    +import org.apache.storm.scheduler.SchedulerAssignment;
    +import org.apache.storm.scheduler.SchedulerAssignmentImpl;
    +import org.apache.storm.scheduler.SupervisorDetails;
    +import org.apache.storm.scheduler.Topologies;
    +import org.apache.storm.scheduler.TopologyDetails;
    +import org.apache.storm.scheduler.WorkerSlot;
    +import org.apache.storm.scheduler.resource.ResourceUtils;
    +import org.apache.storm.security.INimbusCredentialPlugin;
    +import org.apache.storm.security.auth.AuthUtils;
    +import org.apache.storm.security.auth.IAuthorizer;
    +import org.apache.storm.security.auth.ICredentialsRenewer;
    +import org.apache.storm.security.auth.IGroupMappingServiceProvider;
    +import org.apache.storm.security.auth.IPrincipalToLocal;
    +import org.apache.storm.security.auth.NimbusPrincipal;
    +import org.apache.storm.security.auth.ReqContext;
    +import org.apache.storm.security.auth.ThriftConnectionType;
    +import org.apache.storm.security.auth.ThriftServer;
    +import org.apache.storm.stats.StatsUtil;
    +import org.apache.storm.utils.BufferInputStream;
    +import org.apache.storm.utils.ConfigUtils;
    +import org.apache.storm.utils.LocalState;
    +import org.apache.storm.utils.Time;
    +import org.apache.storm.utils.TimeCacheMap;
    +import org.apache.storm.utils.TupleUtils;
    +import org.apache.storm.utils.Utils;
    +import org.apache.storm.utils.Utils.UptimeComputer;
    +import org.apache.storm.utils.VersionInfo;
    +import org.apache.storm.validation.ConfigValidation;
    +import org.apache.storm.zookeeper.Zookeeper;
    +import org.apache.thrift.TException;
    +import org.apache.zookeeper.ZooDefs;
    +import org.apache.zookeeper.data.ACL;
    +import org.json.simple.JSONValue;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import com.codahale.metrics.Meter;
    +import com.google.common.annotations.VisibleForTesting;
    +import com.google.common.collect.ImmutableMap;
    +
    +public class Nimbus implements Iface, Shutdownable, DaemonCommon {
    +    private final static Logger LOG = LoggerFactory.getLogger(Nimbus.class);
    +    
    +    //    Metrics
    +    private static final Meter submitTopologyWithOptsCalls = registerMeter("nimbus:num-submitTopologyWithOpts-calls");
    +    private static final Meter submitTopologyCalls = registerMeter("nimbus:num-submitTopology-calls");
    +    private static final Meter killTopologyWithOptsCalls = registerMeter("nimbus:num-killTopologyWithOpts-calls");
    +    private static final Meter killTopologyCalls = registerMeter("nimbus:num-killTopology-calls");
    +    private static final Meter rebalanceCalls = registerMeter("nimbus:num-rebalance-calls");
    +    private static final Meter activateCalls = registerMeter("nimbus:num-activate-calls");
    +    private static final Meter deactivateCalls = registerMeter("nimbus:num-deactivate-calls");
    +    private static final Meter debugCalls = registerMeter("nimbus:num-debug-calls");
    +    private static final Meter setWorkerProfilerCalls = registerMeter("nimbus:num-setWorkerProfiler-calls");
    +    private static final Meter getComponentPendingProfileActionsCalls = registerMeter("nimbus:num-getComponentPendingProfileActions-calls");
    +    private static final Meter setLogConfigCalls = registerMeter("nimbus:num-setLogConfig-calls");
    +    private static final Meter uploadNewCredentialsCalls = registerMeter("nimbus:num-uploadNewCredentials-calls");
    +    private static final Meter beginFileUploadCalls = registerMeter("nimbus:num-beginFileUpload-calls");
    +    private static final Meter uploadChunkCalls = registerMeter("nimbus:num-uploadChunk-calls");
    +    private static final Meter finishFileUploadCalls = registerMeter("nimbus:num-finishFileUpload-calls");
    +    private static final Meter beginFileDownloadCalls = registerMeter("nimbus:num-beginFileDownload-calls");
    +    private static final Meter downloadChunkCalls = registerMeter("nimbus:num-downloadChunk-calls");
    +    private static final Meter getNimbusConfCalls = registerMeter("nimbus:num-getNimbusConf-calls");
    +    private static final Meter getLogConfigCalls = registerMeter("nimbus:num-getLogConfig-calls");
    +    private static final Meter getTopologyConfCalls = registerMeter("nimbus:num-getTopologyConf-calls");
    +    private static final Meter getTopologyCalls = registerMeter("nimbus:num-getTopology-calls");
    +    private static final Meter getUserTopologyCalls = registerMeter("nimbus:num-getUserTopology-calls");
    +    private static final Meter getClusterInfoCalls = registerMeter("nimbus:num-getClusterInfo-calls");
    +    private static final Meter getTopologyInfoWithOptsCalls = registerMeter("nimbus:num-getTopologyInfoWithOpts-calls");
    +    private static final Meter getTopologyInfoCalls = registerMeter("nimbus:num-getTopologyInfo-calls");
    +    private static final Meter getTopologyPageInfoCalls = registerMeter("nimbus:num-getTopologyPageInfo-calls");
    +    private static final Meter getSupervisorPageInfoCalls = registerMeter("nimbus:num-getSupervisorPageInfo-calls");
    +    private static final Meter getComponentPageInfoCalls = registerMeter("nimbus:num-getComponentPageInfo-calls");
    +    private static final Meter shutdownCalls = registerMeter("nimbus:num-shutdown-calls");
    +    // END Metrics
    +    
    +    private static final String STORM_VERSION = VersionInfo.getVersion();
    +    @VisibleForTesting
    +    public static final List<ACL> ZK_ACLS = Arrays.asList(ZooDefs.Ids.CREATOR_ALL_ACL.get(0),
    +            new ACL(ZooDefs.Perms.READ | ZooDefs.Perms.CREATE, ZooDefs.Ids.ANYONE_ID_UNSAFE));
    +    private static final Subject NIMBUS_SUBJECT = new Subject();
    +    static {
    +        NIMBUS_SUBJECT.getPrincipals().add(new NimbusPrincipal());
    +        NIMBUS_SUBJECT.setReadOnly();
    +    }
    +    
    +    // TOPOLOGY STATE TRANSITIONS
    +    private static StormBase make(TopologyStatus status) {
    +        StormBase ret = new StormBase();
    +        ret.set_status(status);
    +        //The following are required for backwards compatibility with clojure code
    +        ret.set_component_executors(Collections.emptyMap());
    +        ret.set_component_debug(Collections.emptyMap());
    +        return ret;
    +    }
    +    
    +    private static final TopologyStateTransition NOOP_TRANSITION = (arg, nimbus, topoId, base) -> null;
    +    private static final TopologyStateTransition INACTIVE_TRANSITION = (arg, nimbus, topoId, base) -> Nimbus.make(TopologyStatus.INACTIVE);
    +    private static final TopologyStateTransition ACTIVE_TRANSITION = (arg, nimbus, topoId, base) -> Nimbus.make(TopologyStatus.ACTIVE);
    +    private static final TopologyStateTransition KILL_TRANSITION = (killTime, nimbus, topoId, base) -> {
    +        int delay = 0;
    +        if (killTime != null) {
    +            delay = ((Number)killTime).intValue();
    +        } else {
    +            delay = Utils.getInt(Nimbus.readTopoConf(topoId, nimbus.getBlobStore()).get(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS));
    +        }
    +        nimbus.delayEvent(topoId, delay, TopologyActions.REMOVE, null);
    +        StormBase sb = new StormBase();
    +        sb.set_status(TopologyStatus.KILLED);
    +        TopologyActionOptions tao = new TopologyActionOptions();
    +        KillOptions opts = new KillOptions();
    +        opts.set_wait_secs(delay);
    +        tao.set_kill_options(opts);
    +        sb.set_topology_action_options(tao);
    +        sb.set_component_executors(Collections.emptyMap());
    +        sb.set_component_debug(Collections.emptyMap());
    +        return sb;
    +    };
    +    
    +    private static final TopologyStateTransition REBALANCE_TRANSITION = (args, nimbus, topoId, base) -> {
    +        RebalanceOptions rbo = ((RebalanceOptions) args).deepCopy();
    +        int delay = 0;
    +        if (rbo.is_set_wait_secs()) {
    +            delay = rbo.get_wait_secs();
    +        } else {
    +            delay = Utils.getInt(Nimbus.readTopoConf(topoId, nimbus.getBlobStore()).get(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS));
    +        }
    +        nimbus.delayEvent(topoId, delay, TopologyActions.DO_REBALANCE, null);
    +        
    +        rbo.set_wait_secs(delay);
    +        if (!rbo.is_set_num_executors()) {
    +            rbo.set_num_executors(Collections.emptyMap());
    +        }
    +        
    +        StormBase sb = new StormBase();
    +        sb.set_status(TopologyStatus.REBALANCING);
    +        sb.set_prev_status(base.get_status());
    +        TopologyActionOptions tao = new TopologyActionOptions();
    +        tao.set_rebalance_options(rbo);
    +        sb.set_topology_action_options(tao);
    +        sb.set_component_executors(Collections.emptyMap());
    +        sb.set_component_debug(Collections.emptyMap());
    +        
    +        return sb;
    +    };
    +    
    +    private static final TopologyStateTransition STARTUP_WHEN_KILLED_TRANSITION = (args, nimbus, topoId, base) -> {
    +        int delay = base.get_topology_action_options().get_kill_options().get_wait_secs();
    +        nimbus.delayEvent(topoId, delay, TopologyActions.REMOVE, null);
    +        return null;
    +    };
    +    
    +    private static final TopologyStateTransition REMOVE_TRANSITION = (args, nimbus, topoId, base) -> {
    +        LOG.info("Killing topology: {}", topoId);
    +        IStormClusterState state = nimbus.getStormClusterState();
    +        state.removeStorm(topoId);
    +        BlobStore store = nimbus.getBlobStore();
    +        if (store instanceof LocalFsBlobStore) {
    +            for (String key: Nimbus.getKeyListFromId(nimbus.getConf(), topoId)) {
    +                state.removeBlobstoreKey(key);
    +                state.removeKeyVersion(key);
    +            }
    +        }
    +        return null;
    +    };
    +    
    +    private static final TopologyStateTransition STARTUP_WHEN_REBALANCING_TRANSITION = (args, nimbus, topoId, base) -> {
    +        int delay = base.get_topology_action_options().get_rebalance_options().get_wait_secs();
    +        nimbus.delayEvent(topoId, delay, TopologyActions.DO_REBALANCE, null);
    +        return null;
    +    };
    +    
    +    private static final TopologyStateTransition DO_REBALANCE_TRANSITION = (args, nimbus, topoId, base) -> {
    +        nimbus.doRebalance(topoId, base);
    +        return Nimbus.make(base.get_prev_status());
    +    };
    +    
    +    private static final Map<TopologyStatus, Map<TopologyActions, TopologyStateTransition>> TOPO_STATE_TRANSITIONS = 
    +            new ImmutableMap.Builder<TopologyStatus, Map<TopologyActions, TopologyStateTransition>>()
    +            .put(TopologyStatus.ACTIVE, new ImmutableMap.Builder<TopologyActions, TopologyStateTransition>()
    +                    .put(TopologyActions.INACTIVATE, INACTIVE_TRANSITION)
    +                    .put(TopologyActions.ACTIVATE, NOOP_TRANSITION)
    +                    .put(TopologyActions.REBALANCE, REBALANCE_TRANSITION)
    +                    .put(TopologyActions.KILL, KILL_TRANSITION)
    +                    .build())
    +            .put(TopologyStatus.INACTIVE, new ImmutableMap.Builder<TopologyActions, TopologyStateTransition>()
    +                    .put(TopologyActions.ACTIVATE, ACTIVE_TRANSITION)
    +                    .put(TopologyActions.INACTIVATE, NOOP_TRANSITION)
    +                    .put(TopologyActions.REBALANCE, REBALANCE_TRANSITION)
    +                    .put(TopologyActions.KILL, KILL_TRANSITION)
    +                    .build())
    +            .put(TopologyStatus.KILLED, new ImmutableMap.Builder<TopologyActions, TopologyStateTransition>()
    +                    .put(TopologyActions.STARTUP, STARTUP_WHEN_KILLED_TRANSITION)
    +                    .put(TopologyActions.KILL, KILL_TRANSITION)
    +                    .put(TopologyActions.REMOVE, REMOVE_TRANSITION)
    +                    .build())
    +            .put(TopologyStatus.REBALANCING, new ImmutableMap.Builder<TopologyActions, TopologyStateTransition>()
    +                    .put(TopologyActions.STARTUP, STARTUP_WHEN_REBALANCING_TRANSITION)
    +                    .put(TopologyActions.KILL, KILL_TRANSITION)
    +                    .put(TopologyActions.DO_REBALANCE, DO_REBALANCE_TRANSITION)
    +                    .build())
    +            .build();
    +    
    +    // END TOPOLOGY STATE TRANSITIONS
    +    
    +    private static final class Assoc<K,V> implements UnaryOperator<Map<K, V>> {
    +        private final K key;
    +        private final V value;
    +        
    +        public Assoc(K key, V value) {
    +            this.key = key;
    +            this.value = value;
    +        }
    +        
    +        @Override
    +        public Map<K, V> apply(Map<K, V> t) {
    +            Map<K, V> ret = new HashMap<>(t);
    +            ret.put(key, value);
    +            return ret;
    +        }
    +    }
    +    
    +    private static final class Dissoc<K,V> implements UnaryOperator<Map<K, V>> {
    +        private final K key;
    +        
    +        public Dissoc(K key) {
    +            this.key = key;
    +        }
    +        
    +        @Override
    +        public Map<K, V> apply(Map<K, V> t) {
    +            Map<K, V> ret = new HashMap<>(t);
    +            ret.remove(key);
    +            return ret;
    +        }
    +    }
    +    
    +    @VisibleForTesting
    +    public static class StandAloneINimbus implements INimbus {
    +
    +        @Override
    +        public void prepare(@SuppressWarnings("rawtypes") Map stormConf, String schedulerLocalDir) {
    +            //NOOP
    +        }
    +
    +        @SuppressWarnings("unchecked")
    +        @Override
    +        public Collection<WorkerSlot> allSlotsAvailableForScheduling(Collection<SupervisorDetails> supervisors,
    +                Topologies topologies, Set<String> topologiesMissingAssignments) {
    +            Set<WorkerSlot> ret = new HashSet<>();
    +            for (SupervisorDetails sd: supervisors) {
    +                String id = sd.getId();
    +                for (Number port: (Collection<Number>)sd.getMeta()) {
    +                    ret.add(new WorkerSlot(id, port));
    +                }
    +            }
    +            return ret;
    +        }
    +
    +        @Override
    +        public void assignSlots(Topologies topologies, Map<String, Collection<WorkerSlot>> newSlotsByTopologyId) {
    +            //NOOP
    +        }
    +
    +        @Override
    +        public String getHostName(Map<String, SupervisorDetails> supervisors, String nodeId) {
    +            SupervisorDetails sd = supervisors.get(nodeId);
    +            if (sd != null) {
    +                return sd.getHost();
    +            }
    +            return null;
    +        }
    +
    +        @Override
    +        public IScheduler getForcedScheduler() {
    +            return null;
    +        }
    +        
    +    };
    +    
    +    private static class CommonTopoInfo {
    +        public Map<String, Object> topoConf;
    +        public String topoName;
    +        public StormTopology topology;
    +        public Map<Integer, String> taskToComponent;
    +        public StormBase base;
    +        public int launchTimeSecs;
    +        public Assignment assignment;
    +        public Map<List<Integer>, Map<String, Object>> beats;
    +        public HashSet<String> allComponents;
    +
    +    }
    +    
    +    @SuppressWarnings("deprecation")
    +    private static <T extends AutoCloseable> TimeCacheMap<String, T> fileCacheMap(Map<String, Object> conf) {
    +        return new TimeCacheMap<>(Utils.getInt(conf.get(Config.NIMBUS_FILE_COPY_EXPIRATION_SECS), 600),
    +                (id, stream) -> {
    +                    try {
    +                        stream.close();
    +                    } catch (Exception e) {
    +                        throw new RuntimeException(e);
    +                    }
    +                });
    +    }
    +
    +    private static <K, V> Map<K, V> merge(Map<? extends K, ? extends V> first, Map<? extends K, ? extends V> other) {
    +        Map<K, V> ret = new HashMap<>(first);
    +        if (other != null) {
    +            ret.putAll(other);
    +        }
    +        return ret;
    +    }
    +    
    +    private static <K, V> Map<K, V> mapDiff(Map<? extends K, ? extends V> first, Map<? extends K, ? extends V> second) {
    +        Map<K, V> ret = new HashMap<>();
    +        for (Entry<? extends K, ? extends V> entry: second.entrySet()) {
    +            if (!entry.getValue().equals(first.get(entry.getKey()))) {
    +                ret.put(entry.getKey(), entry.getValue());
    +            }
    +        }
    +        return ret;
    +    }
    +
    +    private static IScheduler makeScheduler(Map<String, Object> conf, INimbus inimbus) {
    +        String schedClass = (String) conf.get(Config.STORM_SCHEDULER);
    +        IScheduler scheduler = inimbus == null ? null : inimbus.getForcedScheduler();
    +        if (scheduler != null) {
    +            LOG.info("Using forced scheduler from INimbus {} {}", scheduler.getClass(), scheduler);
    +        } else if (schedClass != null){
    +            LOG.info("Using custom scheduler: {}", schedClass);
    +            scheduler = Utils.newInstance(schedClass);
    +        } else {
    +            LOG.info("Using default scheduler");
    +            scheduler = new DefaultScheduler();
    +        }
    +        scheduler.prepare(conf);
    +        return scheduler;
    +    }
    +
    +    /**
    +     * Constructs a TimeCacheMap instance with a blob store timeout whose
    +     * expiration callback invokes cancel on the value held by an expired entry when
    +     * that value is an AtomicOutputStream and calls close otherwise.
    +     * @param conf the config to use
    +     * @return the newly created map
    +     */
    +    @SuppressWarnings("deprecation")
    +    private static <T extends AutoCloseable> TimeCacheMap<String, T> makeBlobCacheMap(Map<String, Object> conf) {
    +        return new TimeCacheMap<>(Utils.getInt(conf.get(Config.NIMBUS_BLOBSTORE_EXPIRATION_SECS), 600),
    +                (id, stream) -> {
    +                    try {
    +                        if (stream instanceof AtomicOutputStream) {
    +                            ((AtomicOutputStream) stream).cancel();
    +                        } else {
    +                            stream.close();
    +                        }
    +                    } catch (Exception e) {
    +                        throw new RuntimeException(e);
    +                    }
    +                });
    +    }
    +    
    +    /**
    +     * Constructs a TimeCacheMap instance with a blobstore timeout and no callback function.
    +     * @param conf
    +     * @return
    +     */
    +    @SuppressWarnings("deprecation")
    +    private static TimeCacheMap<String, Iterator<String>> makeBlobListCachMap(Map<String, Object> conf) {
    +        return new TimeCacheMap<>(Utils.getInt(conf.get(Config.NIMBUS_BLOBSTORE_EXPIRATION_SECS), 600));
    +    }
    +    
    +    private static ITopologyActionNotifierPlugin createTopologyActionNotifier(Map<String, Object> conf) {
    +        String clazz = (String) conf.get(Config.NIMBUS_TOPOLOGY_ACTION_NOTIFIER_PLUGIN);
    +        ITopologyActionNotifierPlugin ret = null;
    +        if (clazz != null && !clazz.isEmpty()) {
    +            ret = Utils.newInstance(clazz);
    +            try {
    +                ret.prepare(conf);
    +            } catch (Exception e) {
    +                LOG.warn("Ignoring exception, Could not initialize {}", clazz, e);
    +                ret = null;
    +            }
    +        }
    +        return ret;
    +    }
    +    
    +    @SuppressWarnings("unchecked")
    +    private static List<ClusterMetricsConsumerExecutor> makeClusterMetricsConsumerExecutors(Map<String, Object> conf) {
    +        Collection<Map<String, Object>> consumers = (Collection<Map<String, Object>>) conf.get(Config.STORM_CLUSTER_METRICS_CONSUMER_REGISTER);
    +        List<ClusterMetricsConsumerExecutor> ret = new ArrayList<>();
    +        if (consumers != null) {
    +            for (Map<String, Object> consumer : consumers) {
    +                ret.add(new ClusterMetricsConsumerExecutor((String) consumer.get("class"), consumer.get("argument")));
    +            }
    +        }
    +        return ret;
    +    }
    +    
    +    private static Subject getSubject() {
    +        return ReqContext.context().subject();
    +    }
    +    
    +    static Map<String, Object> readTopoConf(String topoId, BlobStore blobStore) throws KeyNotFoundException, AuthorizationException, IOException {
    +        return blobStore.readTopologyConf(topoId, getSubject());
    +    }
    +    
    +    static List<String> getKeyListFromId(Map<String, Object> conf, String id) {
    +        List<String> ret = new ArrayList<>(3);
    +        ret.add(ConfigUtils.masterStormCodeKey(id));
    +        ret.add(ConfigUtils.masterStormConfKey(id));
    +        if (!ConfigUtils.isLocalMode(conf)) {
    +            ret.add(ConfigUtils.masterStormJarKey(id));
    +        }
    +        return ret;
    +    }
    +    
    +    private static int getVersionForKey(String key, NimbusInfo nimbusInfo, Map<String, Object> conf) {
    +        KeySequenceNumber kseq = new KeySequenceNumber(key, nimbusInfo);
    +        return kseq.getKeySequenceNumber(conf);
    +    }
    +    
    +    private static StormTopology readStormTopology(String topoId, BlobStore store) throws KeyNotFoundException, AuthorizationException, IOException {
    +        return store.readTopology(topoId, getSubject());
    +    }
    +    
    +    private static Map<String, Object> readTopoConfAsNimbus(String topoId, BlobStore store) throws KeyNotFoundException, AuthorizationException, IOException {
    +        return store.readTopologyConf(topoId, NIMBUS_SUBJECT);
    +    }
    +    
    +    private static StormTopology readStormTopologyAsNimbus(String topoId, BlobStore store) throws KeyNotFoundException, AuthorizationException, IOException {
    +        return store.readTopology(topoId, NIMBUS_SUBJECT);
    +    }
    +    
    +    /**
    +     * convert {topology-id -> SchedulerAssignment} to
    +     *         {topology-id -> {executor [node port]}}
    +     * @return
    +     */
    +    private static Map<String, Map<List<Long>, List<Object>>> computeTopoToExecToNodePort(Map<String, SchedulerAssignment> schedAssignments) {
    +        Map<String, Map<List<Long>, List<Object>>> ret = new HashMap<>();
    +        for (Entry<String, SchedulerAssignment> schedEntry: schedAssignments.entrySet()) {
    +            Map<List<Long>, List<Object>> execToNodePort = new HashMap<>();
    +            for (Entry<ExecutorDetails, WorkerSlot> execAndNodePort: schedEntry.getValue().getExecutorToSlot().entrySet()) {
    +                ExecutorDetails exec = execAndNodePort.getKey();
    +                WorkerSlot slot = execAndNodePort.getValue();
    +                
    +                List<Long> listExec = new ArrayList<>(2);
    +                listExec.add((long) exec.getStartTask());
    +                listExec.add((long) exec.getEndTask());
    +                
    +                List<Object> nodePort = new ArrayList<>(2);
    +                nodePort.add(slot.getNodeId());
    +                nodePort.add((long)slot.getPort());
    +                
    +                execToNodePort.put(listExec, nodePort);
    +            }
    +            ret.put(schedEntry.getKey(), execToNodePort);
    +        }
    +        return ret;
    +    }
    +    
    +    private static int numUsedWorkers(SchedulerAssignment assignment) {
    +        if (assignment == null) {
    +            return 0;
    +        }
    +        return assignment.getSlots().size();
    +    }
    +    
    +    /**
    +     * convert {topology-id -> SchedulerAssignment} to
    +     *         {topology-id -> {[node port] [mem-on-heap mem-off-heap cpu]}}
    +     * Make sure this can deal with other non-RAS schedulers
    +     * later we may further support map-for-any-resources
    +     * @param schedAssignments the assignments
    +     * @return  {topology-id {[node port] [mem-on-heap mem-off-heap cpu]}}
    +     */
    +    private static Map<String, Map<List<Object>, List<Double>>> computeTopoToNodePortToResources(Map<String, SchedulerAssignment> schedAssignments) {
    +        Map<String, Map<List<Object>, List<Double>>> ret = new HashMap<>();
    +        for (Entry<String, SchedulerAssignment> schedEntry: schedAssignments.entrySet()) {
    +            Map<List<Object>, List<Double>> nodePortToResources = new HashMap<>();
    +            for (WorkerSlot slot: schedEntry.getValue().getExecutorToSlot().values()) {
    +                List<Object> nodePort = new ArrayList<>(2);
    +                nodePort.add(slot.getNodeId());
    +                nodePort.add((long)slot.getPort());
    +                
    +                List<Double> resources = new ArrayList<>(3);
    +                resources.add(slot.getAllocatedMemOnHeap());
    +                resources.add(slot.getAllocatedMemOffHeap());
    +                resources.add(slot.getAllocatedCpu());
    +                
    +                nodePortToResources.put(nodePort, resources);
    +            }
    +            ret.put(schedEntry.getKey(), nodePortToResources);
    +        }
    +        return ret;
    +    }
    +
    +    private static Map<String, Map<List<Long>, List<Object>>> computeNewTopoToExecToNodePort(Map<String, SchedulerAssignment> schedAssignments,
    +            Map<String, Assignment> existingAssignments) {
    +        Map<String, Map<List<Long>, List<Object>>> ret = computeTopoToExecToNodePort(schedAssignments);
    +        //Print some useful information
    +        if (existingAssignments != null && !existingAssignments.isEmpty()) {
    +            for (Entry<String, Map<List<Long>, List<Object>>> entry: ret.entrySet()) {
    +                String topoId = entry.getKey();
    +                Map<List<Long>, List<Object>> execToNodePort = entry.getValue();
    +                Assignment assignment = existingAssignments.get(topoId);
    +                if (assignment == null) {
    +                    continue;
    +                }
    +                Map<List<Long>, NodeInfo> old = assignment.get_executor_node_port();
    +                Map<List<Long>, List<Object>> reassigned = new HashMap<>();
    +                for (Entry<List<Long>, List<Object>> execAndNodePort: execToNodePort.entrySet()) {
    +                    NodeInfo oldAssigned = old.get(execAndNodePort.getKey());
    +                    String node = (String) execAndNodePort.getValue().get(0);
    +                    Long port = (Long) execAndNodePort.getValue().get(1);
    +                    if (oldAssigned == null || !oldAssigned.get_node().equals(node) 
    +                            || !port.equals(oldAssigned.get_port_iterator().next())) {
    +                        reassigned.put(execAndNodePort.getKey(), execAndNodePort.getValue());
    +                    }
    +                }
    +
    +                if (!reassigned.isEmpty()) {
    +                    int count = (new HashSet<>(execToNodePort.values())).size();
    +                    Set<List<Long>> reExecs = reassigned.keySet();
    +                    LOG.info("Reassigning {} to {} slots", topoId, count);
    +                    LOG.info("Reassign executors: {}", reExecs);
    +                }
    +            }
    +        }
    +        return ret;
    +    }
    +    
    +    private static List<List<Long>> changedExecutors(Map<List<Long>, NodeInfo> map,
    +            Map<List<Long>, List<Object>> newExecToNodePort) {
    +        HashMap<NodeInfo, List<List<Long>>> tmpSlotAssigned = map == null ? new HashMap<>() : Utils.reverseMap(map);
    +        HashMap<List<Object>, List<List<Long>>> slotAssigned = new HashMap<>();
    +        for (Entry<NodeInfo, List<List<Long>>> entry: tmpSlotAssigned.entrySet()) {
    +            NodeInfo ni = entry.getKey();
    +            List<Object> key = new ArrayList<>(2);
    +            key.add(ni.get_node());
    +            key.add(ni.get_port_iterator().next());
    +            List<List<Long>> value = new ArrayList<>(entry.getValue());
    +            value.sort((a, b) -> a.get(0).compareTo(b.get(0)));
    +            slotAssigned.put(key, value);
    +        }
    +        HashMap<List<Object>, List<List<Long>>> tmpNewSlotAssigned = newExecToNodePort == null ? new HashMap<>() : Utils.reverseMap(newExecToNodePort);
    +        HashMap<List<Object>, List<List<Long>>> newSlotAssigned = new HashMap<>();
    +        for (Entry<List<Object>, List<List<Long>>> entry: tmpNewSlotAssigned.entrySet()) {
    +            List<List<Long>> value = new ArrayList<>(entry.getValue());
    +            value.sort((a, b) -> a.get(0).compareTo(b.get(0)));
    +            newSlotAssigned.put(entry.getKey(), value);
    +        }
    +        Map<List<Object>, List<List<Long>>> diff = mapDiff(slotAssigned, newSlotAssigned);
    +        List<List<Long>> ret = new ArrayList<>();
    +        for (List<List<Long>> val: diff.values()) {
    +            ret.addAll(val);
    +        }
    +        return ret;
    +    }
    +
    +    private static Set<WorkerSlot> newlyAddedSlots(Assignment old, Assignment current) {
    +        Set<NodeInfo> oldSlots = new HashSet<>(old.get_executor_node_port().values());
    +        Set<NodeInfo> niRet = new HashSet<>(current.get_executor_node_port().values());
    +        niRet.removeAll(oldSlots);
    +        Set<WorkerSlot> ret = new HashSet<>();
    +        for (NodeInfo ni: niRet) {
    +            ret.add(new WorkerSlot(ni.get_node(), ni.get_port_iterator().next()));
    +        }
    +        return ret;
    +    }
    +    
    +    private static Map<String, SupervisorDetails> basicSupervisorDetailsMap(IStormClusterState state) {
    +        Map<String, SupervisorDetails> ret = new HashMap<>();
    +        for (Entry<String, SupervisorInfo> entry: state.allSupervisorInfo().entrySet()) {
    +            String id = entry.getKey();
    +            SupervisorInfo info = entry.getValue();
    +            ret.put(id, new SupervisorDetails(id, info.get_hostname(), info.get_scheduler_meta(), null,
    +                    info.get_resources_map()));
    +        }
    +        return ret;
    +    }
    +    
    +    private static boolean isTopologyActive(IStormClusterState state, String topoName) {
    +        return state.getTopoId(topoName).isPresent();
    +    }
    +    
    +    private static Map<String, Object> tryReadTopoConf(String topoId, BlobStore store) throws NotAliveException, AuthorizationException, IOException {
    +        try {
    +            return readTopoConfAsNimbus(topoId, store);
    +            //Was a try-cause but I looked at the code around this and key not found is not wrapped in runtime,
    +            // so it is not needed
    +        } catch (KeyNotFoundException e) {
    +            if (topoId == null) {
    +                throw new NullPointerException();
    +            }
    +            throw new NotAliveException(topoId);
    +        }
    +    }
    +    
    +    private static final List<String> EMPTY_STRING_LIST = Collections.unmodifiableList(Collections.emptyList());
    +    private static final Set<String> EMPTY_STRING_SET = Collections.unmodifiableSet(Collections.emptySet());
    +    
    +    @VisibleForTesting
    +    public static Set<String> topoIdsToClean(IStormClusterState state, BlobStore store) {
    +        Set<String> ret = new HashSet<>();
    +        ret.addAll(OR(state.heartbeatStorms(), EMPTY_STRING_LIST));
    +        ret.addAll(OR(state.errorTopologies(), EMPTY_STRING_LIST));
    +        ret.addAll(OR(store.storedTopoIds(), EMPTY_STRING_SET));
    +        ret.addAll(OR(state.backpressureTopologies(), EMPTY_STRING_LIST));
    +        ret.removeAll(OR(state.activeStorms(), EMPTY_STRING_LIST));
    +        return ret;
    +    }
    +    
    +    private static String extractStatusStr(StormBase base) {
    +        String ret = null;
    +        TopologyStatus status = base.get_status();
    +        if (status != null) {
    +            ret = status.name().toUpperCase();
    +        }
    +        return ret;
    +    }
    +    
    +    private static int componentParallelism(Map<String, Object> topoConf, Object component) throws InvalidTopologyException {
    +        Map<String, Object> combinedConf = merge(topoConf, StormCommon.componentConf(component));
    +        int numTasks = Utils.getInt(combinedConf.get(Config.TOPOLOGY_TASKS), StormCommon.numStartExecutors(component));
    +        Integer maxParallel = Utils.getInt(combinedConf.get(Config.TOPOLOGY_MAX_TASK_PARALLELISM), null);
    +        int ret = numTasks;
    +        if (maxParallel != null) {
    +            ret = Math.min(maxParallel, numTasks);
    +        }
    +        return ret;
    +    }
    +    
    +    private static StormTopology normalizeTopology(Map<String, Object> topoConf, StormTopology topology) throws InvalidTopologyException {
    +        StormTopology ret = topology.deepCopy();
    +        for (Object comp: StormCommon.allComponents(ret).values()) {
    +            Map<String, Object> mergedConf = StormCommon.componentConf(comp);
    +            mergedConf.put(Config.TOPOLOGY_TASKS, componentParallelism(topoConf, comp));
    +            String jsonConf = JSONValue.toJSONString(mergedConf);
    +            StormCommon.getComponentCommon(comp).set_json_conf(jsonConf);
    +        }
    +        return ret;
    +    }
    +    
    +    private static void addToDecorators(Set<String> decorators, List<String> conf) {
    +        if (conf != null) {
    +            decorators.addAll(conf);
    +        }
    +    }
    +    
    +    @SuppressWarnings("unchecked")
    +    private static void addToSerializers(Map<String, String> ser, List<Object> conf) {
    +        if (conf != null) {
    +            for (Object o: conf) {
    +                if (o instanceof Map) {
    +                    ser.putAll((Map<String,String>)o);
    +                } else {
    +                    ser.put((String)o, null);
    +                }
    +            }
    +        }
    +    }
    +    
    +    @SuppressWarnings("unchecked")
    +    private static Map<String, Object> normalizeConf(Map<String,Object> conf, Map<String, Object> topoConf, StormTopology topology) {
    +        //ensure that serializations are same for all tasks no matter what's on
    +        // the supervisors. this also allows you to declare the serializations as a sequence
    +        List<Map<String, Object>> allConfs = new ArrayList<>();
    +        for (Object comp: StormCommon.allComponents(topology).values()) {
    +            allConfs.add(StormCommon.componentConf(comp));
    +        }
    +
    +        Set<String> decorators = new HashSet<>();
    +        //Yes we are putting in a config that is not the same type we pulled out.
    +        Map<String, String> serializers = new HashMap<>();
    +        for (Map<String, Object> c: allConfs) {
    +            addToDecorators(decorators, (List<String>) c.get(Config.TOPOLOGY_KRYO_DECORATORS));
    +            addToSerializers(serializers, (List<Object>) c.get(Config.TOPOLOGY_KRYO_REGISTER));
    +        }
    +        addToDecorators(decorators, (List<String>)topoConf.getOrDefault(Config.TOPOLOGY_KRYO_DECORATORS, 
    +                conf.get(Config.TOPOLOGY_KRYO_DECORATORS)));
    +        addToSerializers(serializers, (List<Object>)topoConf.getOrDefault(Config.TOPOLOGY_KRYO_REGISTER, 
    +                conf.get(Config.TOPOLOGY_KRYO_REGISTER)));
    +        
    +        Map<String, Object> mergedConf = merge(conf, topoConf);
    +        Map<String, Object> ret = new HashMap<>(topoConf);
    +        ret.put(Config.TOPOLOGY_KRYO_REGISTER, serializers);
    +        ret.put(Config.TOPOLOGY_KRYO_DECORATORS, new ArrayList<>(decorators));
    +        ret.put(Config.TOPOLOGY_ACKER_EXECUTORS, mergedConf.get(Config.TOPOLOGY_ACKER_EXECUTORS));
    +        ret.put(Config.TOPOLOGY_EVENTLOGGER_EXECUTORS, mergedConf.get(Config.TOPOLOGY_EVENTLOGGER_EXECUTORS));
    +        ret.put(Config.TOPOLOGY_MAX_TASK_PARALLELISM, mergedConf.get(Config.TOPOLOGY_MAX_TASK_PARALLELISM));
    +        return ret;
    +    }
    +    
    +    private static void rmBlobKey(BlobStore store, String key, IStormClusterState state) {
    +        try {
    +            store.deleteBlob(key, NIMBUS_SUBJECT);
    +            if (store instanceof LocalFsBlobStore) {
    +                state.removeBlobstoreKey(key);
    +            }
    +        } catch (Exception e) {
    +            //Yes eat the exception
    +            LOG.info("Exception {}", e);
    +        }
    +    }
    +    
    +    /**
    +     * Deletes jar files in dirLoc older than seconds.
    +     * @param dirLoc the location to look in for file
    +     * @param seconds how old is too old and should be deleted
    +     */
    +    @VisibleForTesting
    +    public static void cleanInbox(String dirLoc, int seconds) {
    +        final long now = Time.currentTimeMillis();
    +        final long ms = Time.secsToMillis(seconds);
    +        File dir = new File(dirLoc);
    +        for (File f : dir.listFiles((f) -> f.isFile() && ((f.lastModified() + ms) <= now))) {
    +            if (f.delete()) {
    +                LOG.info("Cleaning inbox ... deleted: {}", f.getName());
    +            } else {
    +                LOG.error("Cleaning inbox ... error deleting: {}", f.getName());
    +            }
    +        }
    +    }
    +    
    +    private static ExecutorInfo toExecInfo(List<Long> exec) {
    +        return new ExecutorInfo(exec.get(0).intValue(), exec.get(1).intValue());
    +    }
    +    
    +    private static final Pattern TOPOLOGY_NAME_REGEX = Pattern.compile("^[^/.:\\\\]+$");
    +    private static void validateTopologyName(String name) throws InvalidTopologyException {
    +        Matcher m = TOPOLOGY_NAME_REGEX.matcher(name);
    +        if (!m.matches()) {
    +            throw new InvalidTopologyException("Topology name must match " + TOPOLOGY_NAME_REGEX);
    +        }
    +    }
    +    
    +    private static StormTopology tryReadTopology(String topoId, BlobStore store) throws NotAliveException, AuthorizationException, IOException {
    +        try {
    +            return readStormTopologyAsNimbus(topoId, store);
    +        } catch (KeyNotFoundException e) {
    +            throw new NotAliveException(topoId);
    +        }
    +    }
    +    
    +    private static void validateTopologySize(Map<String, Object> topoConf, Map<String, Object> nimbusConf, StormTopology topology) throws InvalidTopologyException {
    +        int workerCount = Utils.getInt(topoConf.get(Config.TOPOLOGY_WORKERS), 1);
    +        Integer allowedWorkers = Utils.getInt(nimbusConf.get(Config.NIMBUS_SLOTS_PER_TOPOLOGY), null);
    +        int executorsCount = 0;
    +        for (Object comp : StormCommon.allComponents(topology).values()) {
    +            executorsCount += StormCommon.numStartExecutors(comp);
    +        }
    +        Integer allowedExecutors = Utils.getInt(nimbusConf.get(Config.NIMBUS_EXECUTORS_PER_TOPOLOGY), null);
    +        if (allowedExecutors != null && executorsCount > allowedExecutors) {
    +            throw new InvalidTopologyException("Failed to submit topology. Topology requests more than " +
    +                    allowedExecutors + " executors.");
    +        }
    +        
    +        if (allowedWorkers != null && workerCount > allowedWorkers) {
    +            throw new InvalidTopologyException("Failed to submit topology. Topology requests more than " +
    +                    allowedWorkers + " workers.");
    +        }
    +    }
    +    
    +    private static void setLoggerTimeouts(LogLevel level) {
    +        int timeoutSecs = level.get_reset_log_level_timeout_secs();
    +        if (timeoutSecs > 0) {
    +            level.set_reset_log_level_timeout_epoch(Time.currentTimeSecs() + timeoutSecs);
    +        } else {
    +            level.unset_reset_log_level_timeout_epoch();
    +        }
    +    }
    +    
    +    @VisibleForTesting
    +    public static List<String> topologiesOnSupervisor(Map<String, Assignment> assignments, String supervisorId) {
    +        Set<String> ret = new HashSet<>();
    +        for (Entry<String, Assignment> entry: assignments.entrySet()) {
    +            Assignment assignment = entry.getValue();
    +            for (NodeInfo nodeInfo: assignment.get_executor_node_port().values()) {
    +                if (supervisorId.equals(nodeInfo.get_node())) {
    +                    ret.add(entry.getKey());
    +                    break;
    +                }
    +            }
    +        }
    +        
    +        return new ArrayList<>(ret);
    +    }
    +    
    +    private static IClusterMetricsConsumer.ClusterInfo mkClusterInfo() {
    +        return new IClusterMetricsConsumer.ClusterInfo(Time.currentTimeSecs());
    +    }
    +    
    +    private static List<DataPoint> extractClusterMetrics(ClusterSummary summ) {
    +        List<DataPoint> ret = new ArrayList<>();
    +        ret.add(new DataPoint("supervisors", summ.get_supervisors_size()));
    +        ret.add(new DataPoint("topologies", summ.get_topologies_size()));
    +        
    +        int totalSlots = 0;
    +        int usedSlots = 0;
    +        for (SupervisorSummary sup: summ.get_supervisors()) {
    +            usedSlots += sup.get_num_used_workers();
    +            totalSlots += sup.get_num_workers();
    +        }
    +        ret.add(new DataPoint("slotsTotal", totalSlots));
    +        ret.add(new DataPoint("slotsUsed", usedSlots));
    +        ret.add(new DataPoint("slotsFree", totalSlots - usedSlots));
    +        
    +        int totalExecutors = 0;
    +        int totalTasks = 0;
    +        for (TopologySummary topo: summ.get_topologies()) {
    +            totalExecutors += topo.get_num_executors();
    +            totalTasks += topo.get_num_tasks();
    +        }
    +        ret.add(new DataPoint("executorsTotal", totalExecutors));
    +        ret.add(new DataPoint("tasksTotal", totalTasks));
    +        return ret;
    +    }
    +
    +    private static Map<IClusterMetricsConsumer.SupervisorInfo, List<DataPoint>> extractSupervisorMetrics(ClusterSummary summ) {
    +        Map<IClusterMetricsConsumer.SupervisorInfo, List<DataPoint>> ret = new HashMap<>();
    +        for (SupervisorSummary sup: summ.get_supervisors()) {
    +            IClusterMetricsConsumer.SupervisorInfo info = new IClusterMetricsConsumer.SupervisorInfo(sup.get_host(), sup.get_supervisor_id(), Time.currentTimeSecs());
    +            List<DataPoint> metrics = new ArrayList<>();
    +            metrics.add(new DataPoint("slotsTotal", sup.get_num_workers()));
    +            metrics.add(new DataPoint("slotsUsed", sup.get_num_used_workers()));
    +            metrics.add(new DataPoint("totalMem", sup.get_total_resources().get(Config.SUPERVISOR_MEMORY_CAPACITY_MB)));
    +            metrics.add(new DataPoint("totalCpu", sup.get_total_resources().get(Config.SUPERVISOR_CPU_CAPACITY)));
    +            metrics.add(new DataPoint("usedMem", sup.get_used_mem()));
    +            metrics.add(new DataPoint("usedCpu", sup.get_used_cpu()));
    +            ret.put(info, metrics);
    +        }
    +        return ret;
    +    }
    +    
    +    private static Map<String, Double> setResourcesDefaultIfNotSet(Map<String, Map<String, Double>> compResourcesMap, String compId, Map<String, Object> topoConf) {
    +        Map<String, Double> resourcesMap = compResourcesMap.get(compId);
    +        if (resourcesMap == null) {
    +            resourcesMap = new HashMap<>();
    +        }
    +        ResourceUtils.checkIntialization(resourcesMap, compId, topoConf);
    +        return resourcesMap;
    +    }
    +    
    +    private static void validatePortAvailable(Map<String, Object> conf) throws IOException {
    +        int port = Utils.getInt(conf.get(Config.NIMBUS_THRIFT_PORT));
    +        try (ServerSocket socket = new ServerSocket(port)) {
    +            //Nothing
    +        } catch (BindException e) {
    +            LOG.error("{} is not available. Check if another process is already listening on {}", port, port);
    +            System.exit(0);
    +        }
    +    }
    +    
    +    private static Nimbus launchServer(Map<String, Object> conf, INimbus inimbus) throws Exception {
    +        StormCommon.validateDistributedMode(conf);
    +        validatePortAvailable(conf);
    +        final Nimbus nimbus = new Nimbus(conf, inimbus);
    +        nimbus.launchServer();
    +        final ThriftServer server = new ThriftServer(conf, new Processor<>(nimbus), ThriftConnectionType.NIMBUS);
    +        Utils.addShutdownHookWithForceKillIn1Sec(() -> {
    +            nimbus.shutdown();
    +            server.stop();
    +        });
    +        LOG.info("Starting nimbus server for storm version '{}'", STORM_VERSION);
    +        server.serve();
    +        return nimbus;
    +    }
    +    
    +    public static Nimbus launch(INimbus inimbus) throws Exception {
    +        Map<String, Object> conf = merge(ConfigUtils.readStormConfig(),
    +                ConfigUtils.readYamlConfig("storm-cluster-auth.yaml", false));
    +        return launchServer(conf, inimbus);
    +    }
    +    
    +    public static void main(String[] args) throws Exception {
    +        Utils.setupDefaultUncaughtExceptionHandler();
    +        launch(new StandAloneINimbus());
    +    }
    +    
    +    private final Map<String, Object> conf;
    +    private final NimbusInfo nimbusHostPortInfo;
    +    private final INimbus inimbus;
    +    private IAuthorizer authorizationHandler;
    +    private final IAuthorizer impersonationAuthorizationHandler;
    +    private final AtomicLong submittedCount;
    +    private final IStormClusterState stormClusterState;
    +    private final Object submitLock;
    +    private final Object credUpdateLock;
    +    private final AtomicReference<Map<String, Map<List<Integer>, Map<String, Object>>>> heartbeatsCache;
    +    @SuppressWarnings("deprecation")
    +    private final TimeCacheMap<String, BufferInputStream> downloaders;
    +    @SuppressWarnings("deprecation")
    +    private final TimeCacheMap<String, WritableByteChannel> uploaders;
    +    private final BlobStore blobStore;
    +    @SuppressWarnings("deprecation")
    +    private final TimeCacheMap<String, BufferInputStream> blobDownloaders;
    +    @SuppressWarnings("deprecation")
    +    private final TimeCacheMap<String, OutputStream> blobUploaders;
    +    @SuppressWarnings("deprecation")
    +    private final TimeCacheMap<String, Iterator<String>> blobListers;
    +    private final UptimeComputer uptime;
    +    private final ITopologyValidator validator;
    +    private final StormTimer timer;
    +    private final IScheduler scheduler;
    +    private final ILeaderElector leaderElector;
    +    private final AtomicReference<Map<String, String>> idToSchedStatus;
    +    private final AtomicReference<Map<String, Double[]>> nodeIdToResources;
    +    private final AtomicReference<Map<String, TopologyResources>> idToResources;
    +    private final AtomicReference<Map<String, Map<WorkerSlot, WorkerResources>>> idToWorkerResources;
    +    private final Collection<ICredentialsRenewer> credRenewers;
    +    private final Object topologyHistoryLock;
    +    private final LocalState topologyHistoryState;
    +    private final Collection<INimbusCredentialPlugin> nimbusAutocredPlugins;
    +    private final ITopologyActionNotifierPlugin nimbusTopologyActionNotifier;
    +    private final List<ClusterMetricsConsumerExecutor> clusterConsumerExceutors;
    +    private final IGroupMappingServiceProvider groupMapper;
    +    private final IPrincipalToLocal principalToLocal;
    +    
    +    private static IStormClusterState makeStormClusterState(Map<String, Object> conf) throws Exception {
    +        List<ACL> acls = null;
    +        if (Utils.isZkAuthenticationConfiguredStormServer(conf)) {
    +            acls = ZK_ACLS;
    +        }
    +        return ClusterUtils.mkStormClusterState(conf, acls, new ClusterStateContext(DaemonType.NIMBUS));
    +    }
    +    
    +    public Nimbus(Map<String, Object> conf, INimbus inimbus) throws Exception {
    +        this(conf, inimbus, null, null, null, null, null);
    +    }
    +    
    +    public Nimbus(Map<String, Object> conf, INimbus inimbus, IStormClusterState stormClusterState, NimbusInfo hostPortInfo,
    +            BlobStore blobStore, ILeaderElector leaderElector, IGroupMappingServiceProvider groupMapper) throws Exception {
    +        this.conf = conf;
    +        if (hostPortInfo == null) {
    +            hostPortInfo = NimbusInfo.fromConf(conf);
    +        }
    +        this.nimbusHostPortInfo = hostPortInfo;
    +        if (inimbus != null) {
    +            inimbus.prepare(conf, ConfigUtils.masterInimbusDir(conf));
    +        }
    +        
    +        this.inimbus = inimbus;
    +        this.authorizationHandler = StormCommon.mkAuthorizationHandler((String) conf.get(Config.NIMBUS_AUTHORIZER), conf);
    +        this.impersonationAuthorizationHandler = StormCommon.mkAuthorizationHandler((String) conf.get(Config.NIMBUS_IMPERSONATION_AUTHORIZER), conf);
    +        this.submittedCount = new AtomicLong(0);
    +        if (stormClusterState == null) {
    +            stormClusterState =  makeStormClusterState(conf);
    +        }
    +        this.stormClusterState = stormClusterState;
    +        this.submitLock = new Object();
    +        this.credUpdateLock = new Object();
    +        this.heartbeatsCache = new AtomicReference<>(new HashMap<>());
    +        this.downloaders = fileCacheMap(conf);
    +        this.uploaders = fileCacheMap(conf);
    +        if (blobStore == null) {
    +            blobStore = Utils.getNimbusBlobStore(conf, this.nimbusHostPortInfo);
    +        }
    +        this.blobStore = blobStore;
    +        this.blobDownloaders = makeBlobCacheMap(conf);
    +        this.blobUploaders = makeBlobCacheMap(conf);
    +        this.blobListers = makeBlobListCachMap(conf);
    +        this.uptime = Utils.makeUptimeComputer();
    +        this.validator = Utils.newInstance((String) conf.getOrDefault(Config.NIMBUS_TOPOLOGY_VALIDATOR, DefaultTopologyValidator.class.getName()));
    +        this.timer = new StormTimer(null, (t, e) -> {
    +            LOG.error("Error while processing event", e);
    +            Utils.exitProcess(20, "Error while processing event");
    +        });
    +        this.scheduler = makeScheduler(conf, inimbus);
    +        if (leaderElector == null) {
    +            leaderElector = Zookeeper.zkLeaderElector(conf, blobStore);
    +        }
    +        this.leaderElector = leaderElector;
    +        this.idToSchedStatus = new AtomicReference<>(new HashMap<>());
    +        this.nodeIdToResources = new AtomicReference<>(new HashMap<>());
    +        this.idToResources = new AtomicReference<>(new HashMap<>());
    +        this.idToWorkerResources = new AtomicReference<>(new HashMap<>());
    +        this.credRenewers = AuthUtils.GetCredentialRenewers(conf);
    +        this.topologyHistoryLock = new Object();
    +        this.topologyHistoryState = ConfigUtils.nimbusTopoHistoryState(conf);
    +        this.nimbusAutocredPlugins = AuthUtils.getNimbusAutoCredPlugins(conf);
    +        this.nimbusTopologyActionNotifier = createTopologyActionNotifier(conf);
    +        this.clusterConsumerExceutors = makeClusterMetricsConsumerExecutors(conf);
    +        if (groupMapper == null) {
    +            groupMapper = AuthUtils.GetGroupMappingServiceProviderPlugin(conf);
    +        }
    +        this.groupMapper = groupMapper;
    +        this.principalToLocal = AuthUtils.GetPrincipalToLocalPlugin(conf);
    +    }
    +
    +    Map<String, Object> getConf() {
    +        return conf;
    +    }
    +    
    +    @VisibleForTesting
    +    public void setAuthorizationHandler(IAuthorizer authorizationHandler) {
    +        this.authorizationHandler = authorizationHandler;
    +    }
    +
    +    private IStormClusterState getStormClusterState() {
    +        return stormClusterState;
    +    }
    +    
    +    @VisibleForTesting
    +    public AtomicReference<Map<String,Map<List<Integer>,Map<String,Object>>>> getHeartbeatsCache() {
    +        return heartbeatsCache;
    +    }
    +
    +    private BlobStore getBlobStore() {
    +        return blobStore;
    +    }
    +    
    +    private boolean isLeader() throws Exception {
    +        return leaderElector.isLeader();
    +    }
    +    
    +    private void assertIsLeader() throws Exception {
    +        if (!isLeader()) {
    +            NimbusInfo leaderAddress = leaderElector.getLeader();
    +            throw new RuntimeException("not a leader, current leader is " + leaderAddress);
    +        }
    +    }
    +    
    +    private String getInbox() throws IOException {
    +        return ConfigUtils.masterInbox(conf);
    +    }
    +    
    +    void delayEvent(String topoId, int delaySecs, TopologyActions event, Object args) {
    +        LOG.info("Delaying event {} for {} secs for {}", event, delaySecs, topoId);
    +        timer.schedule(delaySecs, () -> {
    +            try {
    +                transition(topoId, event, args, false);
    +            } catch (Exception e) {
    +                throw new RuntimeException(e);
    +            }
    +        });
    +    }
    +
    +    void doRebalance(String topoId, StormBase stormBase) throws Exception {
    +        RebalanceOptions rbo = stormBase.get_topology_action_options().get_rebalance_options();
    +        StormBase updated = new StormBase();
    +        updated.set_topology_action_options(null);
    +        updated.set_component_debug(Collections.emptyMap());
    +        
    +        if (rbo.is_set_num_executors()) {
    +            updated.set_component_executors(rbo.get_num_executors());
    +        }
    +        
    +        if (rbo.is_set_num_workers()) {
    +            updated.set_num_workers(rbo.get_num_workers());
    +        }
    +        stormClusterState.updateStorm(topoId, updated);
    +        mkAssignments(topoId);
    +    }
    +    
    +    private String toTopoId(String topoName) throws NotAliveException {
    +        return stormClusterState.getTopoId(topoName)
    +                .orElseThrow(() -> new NotAliveException(topoName + " is not alive"));
    +    }
    +    
    +    private void transitionName(String topoName, TopologyActions event, Object eventArg, boolean errorOnNoTransition) throws Exception {
    +        transition(toTopoId(topoName), event, eventArg, errorOnNoTransition);
    +    }
    +
    +    private void transition(String topoId, TopologyActions event, Object eventArg) throws Exception {
    +        transition(topoId, event, eventArg, false);
    +    }
    +    
    +    private void transition(String topoId, TopologyActions event, Object eventArg, boolean errorOnNoTransition) throws Exception {
    +        LOG.info("TRANSITION: {} {} {} {}", topoId, event, eventArg, errorOnNoTransition);
    +        assertIsLeader();
    +        synchronized(submitLock) {
    +            IStormClusterState clusterState = stormClusterState;
    +            StormBase base = clusterState.stormBase(topoId, null);
    +            TopologyStatus status = base.get_status();
    +            if (status == null) {
    +                LOG.info("Cannot apply event {} to {} because topology no longer exists", event, topoId);
    +            } else {
    +                TopologyStateTransition transition = TOPO_STATE_TRANSITIONS.get(status).get(event);
    +                if (transition == null) {
    +                    String message = "No transition for event: " + event + ", status: " + status + " storm-id: " + topoId;
    +                    if (errorOnNoTransition) {
    +                        throw new RuntimeException(message);
    +                    }
    +                    
    +                    if (TopologyActions.STARTUP != event) {
    +                        //STARTUP is a system event so don't log an issue
    +                        LOG.info(message);
    +                    }
    +                    transition = NOOP_TRANSITION;
    +                }
    +                StormBase updates = transition.transition(eventArg, this, topoId, base);
    +                if (updates != null) {
    +                    clusterState.updateStorm(topoId, updates);
    +                }
    +            }
    +        }
    +    }
    +    
    +    private void setupStormCode(Map<String, Object> conf, String topoId, String tmpJarLocation, 
    +            Map<String, Object> topoConf, StormTopology topology) throws Exception {
    +        Subject subject = getSubject();
    +        IStormClusterState clusterState = stormClusterState;
    +        BlobStore store = blobStore;
    +        String jarKey = ConfigUtils.masterStormJarKey(topoId);
    +        String codeKey = ConfigUtils.masterStormCodeKey(topoId);
    +        String confKey = ConfigUtils.masterStormConfKey(topoId);
    +        NimbusInfo hostPortInfo = nimbusHostPortInfo;
    +        if (tmpJarLocation != null) {
    +            //in local mode there is no jar
    +            try (FileInputStream fin = new FileInputStream(tmpJarLocation)) {
    +                store.createBlob(jarKey, fin, new SettableBlobMeta(BlobStoreAclHandler.DEFAULT), subject);
    +            }
    +            if (store instanceof LocalFsBlobStore) {
    +                clusterState.setupBlobstore(jarKey, hostPortInfo, getVersionForKey(jarKey, hostPortInfo, conf));
    +            }
    +        }
    +        
    +        store.createBlob(confKey, Utils.toCompressedJsonConf(topoConf), new SettableBlobMeta(BlobStoreAclHandler.DEFAULT), subject);
    +        if (store instanceof LocalFsBlobStore) {
    +            clusterState.setupBlobstore(confKey, hostPortInfo, getVersionForKey(confKey, hostPortInfo, conf));
    +        }
    +        
    +        store.createBlob(codeKey, Utils.serialize(topology), new SettableBlobMeta(BlobStoreAclHandler.DEFAULT), subject);
    +        if (store instanceof LocalFsBlobStore) {
    +            clusterState.setupBlobstore(codeKey, hostPortInfo, getVersionForKey(codeKey, hostPortInfo, conf));
    +        }
    +    }
    +    
    +    private Integer getBlobReplicationCount(String key) throws Exception {
    +        BlobStore store = blobStore;
    +        if (store != null) {
    +            return store.getBlobReplication(key, NIMBUS_SUBJECT);
    +        }
    +        return null;
    +    }
    +    
    +    private void waitForDesiredCodeReplication(Map<String, Object> topoConf, String topoId) throws Exception {
    +        int minReplicationCount = Utils.getInt(topoConf.get(Config.TOPOLOGY_MIN_REPLICATION_COUNT));
    +        int maxWaitTime = Utils.getInt(topoConf.get(Config.TOPOLOGY_MAX_REPLICATION_WAIT_TIME_SEC));
    +        int jarCount = minReplicationCount;
    +        if (!ConfigUtils.isLocalMode(topoConf)) {
    +            jarCount = getBlobReplicationCount(ConfigUtils.masterStormJarKey(topoId));
    +        }
    +        int codeCount = getBlobReplicationCount(ConfigUtils.masterStormCodeKey(topoId));
    +        int confCount = getBlobReplicationCount(ConfigUtils.masterStormConfKey(topoId));
    +        long totalWaitTime = 0;
    +        //When is this ever null?
    +        if (blobStore != null) {
    +            while (jarCount < minReplicationCount &&
    +                    codeCount < minReplicationCount &&
    +                    confCount < minReplicationCount) {
    +                if (maxWaitTime > 0 && totalWaitTime > maxWaitTime) {
    +                    LOG.info("desired replication count of {} not achieved but we have hit the max wait time {}"
    +                            + " so moving on with replication count for conf key = {} for code key = {} for jar key = ",
    +                            minReplicationCount, maxWaitTime, confCount, codeCount, jarCount);
    +                    return;
    +                }
    +                LOG.info("WAITING... {} <? {} {} {}", minReplicationCount, jarCount, codeCount, confCount);
    +                LOG.info("WAITING... {} <? {}", totalWaitTime, maxWaitTime);
    +                Time.sleepSecs(1);
    +                totalWaitTime++;
    +                if (!ConfigUtils.isLocalMode(topoConf)) {
    +                    jarCount = getBlobReplicationCount(ConfigUtils.masterStormJarKey(topoId));
    +                }
    +                codeCount = getBlobReplicationCount(ConfigUtils.masterStormCodeKey(topoId));
    +                confCount = getBlobReplicationCount(ConfigUtils.masterStormConfKey(topoId));
    +            }
    +        }
    +        LOG.info("desired replication count {} achieved, current-replication-count for conf key = {},"
    +                + " current-replication-count for code key = {}, current-replication-count for jar key = {}", 
    +                minReplicationCount, confCount, codeCount, jarCount);
    +    }
    +    
    +    private TopologyDetails readTopologyDetails(String topoId) throws NotAliveException, KeyNotFoundException, AuthorizationException, IOException, InvalidTopologyException {
    +        StormBase base = stormClusterState.stormBase(topoId, null);
    +        if (base == null) {
    +            if (topoId == null) {
    +                throw new NullPointerException();
    +            }
    +            throw new NotAliveException(topoId);
    +        }
    +        BlobStore store = blobStore;
    +        Map<String, Object> topoConf = readTopoConfAsNimbus(topoId, store);
    +        StormTopology topo = readStormTopologyAsNimbus(topoId, store);
    +        Map<List<Integer>, String> rawExecToComponent = computeExecutorToComponent(topoId);
    +        Map<ExecutorDetails, String> executorsToComponent = new HashMap<>();
    +        for (Entry<List<Integer>, String> entry: rawExecToComponent.entrySet()) {
    +            List<Integer> execs = entry.getKey();
    +            ExecutorDetails execDetails = new ExecutorDetails(execs.get(0), execs.get(1));
    +            executorsToComponent.put(execDetails, entry.getValue());
    +        }
    +        
    +        return new TopologyDetails(topoId, topoConf, topo, base.get_num_workers(), executorsToComponent, base.get_launch_time_secs());
    +    }
    +    
    +    private void updateHeartbeats(String topoId, Set<List<Integer>> allExecutors, Assignment existingAssignment) {
    +        LOG.debug("Updating heartbeats for {} {}", topoId, allExecutors);
    +        IStormClusterState state = stormClusterState;
    +        Map<List<Integer>, Map<String, Object>> executorBeats = StatsUtil.convertExecutorBeats(state.executorBeats(topoId, exis
    --- End diff --
    
    If we want to fix an issue due to STORM-2126 I would propose that we do that on a separate JIRA.  Mostly because STORM-2126 went into the 1.1 line as well as the 2.0 line.  If that goes in first I would then be happy to translate it to java.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---