You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2016/04/01 16:07:46 UTC
[23/35] storm git commit: update
update
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/75364892
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/75364892
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/75364892
Branch: refs/heads/master
Commit: 753648927bb2c82443ede9525200bb6197f8d3b6
Parents: 724f5d2 0100898
Author: xiaojian.fxj <xi...@alibaba-inc.com>
Authored: Fri Mar 25 10:08:57 2016 +0800
Committer: xiaojian.fxj <xi...@alibaba-inc.com>
Committed: Fri Mar 25 10:48:05 2016 +0800
----------------------------------------------------------------------
CHANGELOG.md | 3 +
bin/storm.py | 5 +-
.../storm/starter/tools/SlotBasedCounter.java | 12 +--
external/sql/storm-sql-core/pom.xml | 18 ++++
external/storm-hbase/pom.xml | 2 +-
.../jvm/org/apache/storm/kafka/KafkaUtils.java | 4 +
.../org/apache/storm/daemon/builtin_metrics.clj | 97 --------------------
.../clj/org/apache/storm/daemon/executor.clj | 37 ++++----
.../apache/storm/daemon/local_supervisor.clj | 2 +-
.../src/clj/org/apache/storm/daemon/task.clj | 8 +-
.../src/jvm/org/apache/storm/StormTimer.java | 1 +
.../daemon/metrics/BuiltinBoltMetrics.java | 78 ++++++++++++++++
.../storm/daemon/metrics/BuiltinMetrics.java | 33 +++++++
.../daemon/metrics/BuiltinMetricsUtil.java | 79 ++++++++++++++++
.../daemon/metrics/BuiltinSpoutMetrics.java | 64 +++++++++++++
.../daemon/metrics/SpoutThrottlingMetrics.java | 57 ++++++++++++
.../daemon/supervisor/SyncProcessEvent.java | 4 +-
.../daemon/supervisor/SyncSupervisorEvent.java | 16 +---
.../supervisor/timer/SupervisorHeartbeat.java | 4 +-
.../jvm/org/apache/storm/drpc/DRPCSpout.java | 51 +++++++---
.../jvm/org/apache/storm/stats/StatsUtil.java | 4 +-
.../test/clj/org/apache/storm/drpc_test.clj | 69 ++++++++++++--
storm-dist/binary/pom.xml | 10 --
storm-dist/binary/src/main/assembly/binary.xml | 4 +-
24 files changed, 484 insertions(+), 178 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/75364892/bin/storm.py
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/75364892/storm-core/src/clj/org/apache/storm/daemon/local_supervisor.clj
----------------------------------------------------------------------
diff --cc storm-core/src/clj/org/apache/storm/daemon/local_supervisor.clj
index b28ae08,0000000..ba3c87e
mode 100644,000000..100644
--- a/storm-core/src/clj/org/apache/storm/daemon/local_supervisor.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/local_supervisor.clj
@@@ -1,64 -1,0 +1,64 @@@
+;; Licensed to the Apache Software Foundation (ASF) under one
+;; or more contributor license agreements. See the NOTICE file
+;; distributed with this work for additional information
+;; regarding copyright ownership. The ASF licenses this file
+;; to you under the Apache License, Version 2.0 (the
+;; "License"); you may not use this file except in compliance
+;; with the License. You may obtain a copy of the License at
+;;
+;; http://www.apache.org/licenses/LICENSE-2.0
+;;
+;; Unless required by applicable law or agreed to in writing, software
+;; distributed under the License is distributed on an "AS IS" BASIS,
+;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+;; See the License for the specific language governing permissions and
+;; limitations under the License.
+(ns org.apache.storm.daemon.local-supervisor
+ (:import [org.apache.storm.daemon.supervisor SyncProcessEvent SupervisorData Supervisor SupervisorUtils]
+ [org.apache.storm.utils Utils ConfigUtils]
+ [org.apache.storm ProcessSimulator])
+ (:use [org.apache.storm.daemon common]
+ [org.apache.storm log])
+ (:require [org.apache.storm.daemon [worker :as worker] ])
+ (:require [clojure.string :as str])
+ (:gen-class))
+
+(defn launch-local-worker [supervisorData stormId port workerId resources]
+ (let [conf (.getConf supervisorData)
+ pid (Utils/uuid)
+ worker (worker/mk-worker conf
+ (.getSharedContext supervisorData)
+ stormId
+ (.getAssignmentId supervisorData)
+ (int port)
+ workerId)]
+ (ConfigUtils/setWorkerUserWSE conf workerId "")
+ (ProcessSimulator/registerProcess pid worker)
+ (.put (.getWorkerThreadPids supervisorData) workerId pid)
+ ))
+(defn shutdown-local-worker [supervisorData worker-manager workerId]
+ (log-message "shutdown-local-worker")
+ (let [supervisor-id (.getSupervisorId supervisorData)
+ worker-pids (.getWorkerThreadPids supervisorData)
+ dead-workers (.getDeadWorkers supervisorData)]
+ (.shutdownWorker worker-manager supervisor-id workerId worker-pids)
+ (if (.cleanupWorker worker-manager workerId)
+ (.remove dead-workers workerId))))
+
+(defn local-process []
+ "Create a local process event"
+ (proxy [SyncProcessEvent] []
+ (launchLocalWorker [supervisorData stormId port workerId resources]
+ (launch-local-worker supervisorData stormId port workerId resources))
- (shutWorker [supervisorData worker-manager workerId] (shutdown-local-worker supervisorData worker-manager workerId))))
++ (killWorker [supervisorData worker-manager workerId] (shutdown-local-worker supervisorData worker-manager workerId))))
+
+
+(defserverfn mk-local-supervisor [conf shared-context isupervisor]
+ (log-message "Starting local Supervisor with conf " conf)
+ (if (not (ConfigUtils/isLocalMode conf))
+ (throw
+ (IllegalArgumentException. "Cannot start server in distrubuted mode!")))
+ (let [local-process (local-process)
+ supervisor-server (Supervisor.)]
+ (.setLocalSyncProcess supervisor-server local-process)
+ (.mkSupervisor supervisor-server conf shared-context isupervisor)))
http://git-wip-us.apache.org/repos/asf/storm/blob/75364892/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SyncProcessEvent.java
----------------------------------------------------------------------
diff --cc storm-core/src/jvm/org/apache/storm/daemon/supervisor/SyncProcessEvent.java
index 41fa01d,0000000..feb8e03
mode 100644,000000..100644
--- a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SyncProcessEvent.java
+++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SyncProcessEvent.java
@@@ -1,428 -1,0 +1,428 @@@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.daemon.supervisor;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang.StringUtils;
+import org.apache.storm.Config;
+import org.apache.storm.container.cgroup.CgroupManager;
+import org.apache.storm.daemon.supervisor.workermanager.IWorkerManager;
+import org.apache.storm.generated.ExecutorInfo;
+import org.apache.storm.generated.LSWorkerHeartbeat;
+import org.apache.storm.generated.LocalAssignment;
+import org.apache.storm.generated.WorkerResources;
+import org.apache.storm.utils.ConfigUtils;
+import org.apache.storm.utils.LocalState;
+import org.apache.storm.utils.Time;
+import org.apache.storm.utils.Utils;
+import org.eclipse.jetty.util.ConcurrentHashSet;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.yaml.snakeyaml.Yaml;
+
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.util.*;
+
+/**
+ * 1. to kill are those in allocated that are dead or disallowed 2. kill the ones that should be dead - read pids, kill -9 and individually remove file - rmr
+ * heartbeat dir, rmdir pid dir, rmdir id dir (catch exception and log) 3. of the rest, figure out what assignments aren't yet satisfied 4. generate new worker
+ * ids, write new "approved workers" to LS 5. create local dir for worker id 5. launch new workers (give worker-id, port, and supervisor-id) 6. wait for workers
+ * launch
+ */
+public class SyncProcessEvent implements Runnable {
+
+ private static Logger LOG = LoggerFactory.getLogger(SyncProcessEvent.class);
+
+ private LocalState localState;
+ private SupervisorData supervisorData;
+ public static final ExecutorInfo SYSTEM_EXECUTOR_INFO = new ExecutorInfo(-1, -1);
+
+ private class ProcessExitCallback implements Utils.ExitCodeCallable {
+ private final String logPrefix;
+ private final String workerId;
+
+ public ProcessExitCallback(String logPrefix, String workerId) {
+ this.logPrefix = logPrefix;
+ this.workerId = workerId;
+ }
+
+ @Override
+ public Object call() throws Exception {
+ return null;
+ }
+
+ @Override
+ public Object call(int exitCode) {
+ LOG.info("{} exited with code: {}", logPrefix, exitCode);
+ supervisorData.getDeadWorkers().add(workerId);
+ return null;
+ }
+ }
+
+ public SyncProcessEvent(){
+
+ }
+ public SyncProcessEvent(SupervisorData supervisorData) {
+ init(supervisorData);
+ }
+
+ //TODO: initData is intended to local supervisor, so we will remove them after porting worker.clj to java
+ public void init(SupervisorData supervisorData){
+ this.supervisorData = supervisorData;
+ this.localState = supervisorData.getLocalState();
+ }
+
+ @Override
+ public void run() {
+ LOG.debug("Syncing processes");
+ try {
+ Map conf = supervisorData.getConf();
+ Map<Integer, LocalAssignment> assignedExecutors = localState.getLocalAssignmentsMap();
+
+ if (assignedExecutors == null) {
+ assignedExecutors = new HashMap<>();
+ }
+ int now = Time.currentTimeSecs();
+
+ Map<String, StateHeartbeat> localWorkerStats = getLocalWorkerStats(supervisorData, assignedExecutors, now);
+
+ Set<String> keeperWorkerIds = new HashSet<>();
+ Set<Integer> keepPorts = new HashSet<>();
+ for (Map.Entry<String, StateHeartbeat> entry : localWorkerStats.entrySet()) {
+ StateHeartbeat stateHeartbeat = entry.getValue();
+ if (stateHeartbeat.getState() == State.VALID) {
+ keeperWorkerIds.add(entry.getKey());
+ keepPorts.add(stateHeartbeat.getHeartbeat().get_port());
+ }
+ }
+ Map<Integer, LocalAssignment> reassignExecutors = getReassignExecutors(assignedExecutors, keepPorts);
+ Map<Integer, String> newWorkerIds = new HashMap<>();
+ for (Integer port : reassignExecutors.keySet()) {
+ newWorkerIds.put(port, Utils.uuid());
+ }
+ LOG.debug("Syncing processes");
+ LOG.debug("Assigned executors: {}", assignedExecutors);
+ LOG.debug("Allocated: {}", localWorkerStats);
+
+ for (Map.Entry<String, StateHeartbeat> entry : localWorkerStats.entrySet()) {
+ StateHeartbeat stateHeartbeat = entry.getValue();
+ if (stateHeartbeat.getState() != State.VALID) {
+ LOG.info("Shutting down and clearing state for id {}, Current supervisor time: {}, State: {}, Heartbeat: {}", entry.getKey(), now,
+ stateHeartbeat.getState(), stateHeartbeat.getHeartbeat());
- shutWorker(supervisorData, supervisorData.getWorkerManager(), entry.getKey());
++ killWorker(supervisorData, supervisorData.getWorkerManager(), entry.getKey());
+ }
+ }
+ // start new workers
+ Map<String, Integer> newWorkerPortToIds = startNewWorkers(newWorkerIds, reassignExecutors);
+
+ Map<String, Integer> allWorkerPortToIds = new HashMap<>();
+ Map<String, Integer> approvedWorkers = localState.getApprovedWorkers();
+ for (String keeper : keeperWorkerIds) {
+ allWorkerPortToIds.put(keeper, approvedWorkers.get(keeper));
+ }
+ allWorkerPortToIds.putAll(newWorkerPortToIds);
+ localState.setApprovedWorkers(allWorkerPortToIds);
+ waitForWorkersLaunch(conf, newWorkerPortToIds.keySet());
+
+ } catch (Exception e) {
+ LOG.error("Failed Sync Process", e);
+ throw Utils.wrapInRuntime(e);
+ }
+
+ }
+
+ protected void waitForWorkersLaunch(Map conf, Set<String> workerIds) throws Exception {
+ int startTime = Time.currentTimeSecs();
+ int timeOut = (int) conf.get(Config.NIMBUS_SUPERVISOR_TIMEOUT_SECS);
+ for (String workerId : workerIds) {
+ LocalState localState = ConfigUtils.workerState(conf, workerId);
+ while (true) {
+ LSWorkerHeartbeat hb = localState.getWorkerHeartBeat();
+ if (hb != null || (Time.currentTimeSecs() - startTime) > timeOut)
+ break;
+ LOG.info("{} still hasn't started", workerId);
+ Time.sleep(500);
+ }
+ if (localState.getWorkerHeartBeat() == null) {
+ LOG.info("Worker {} failed to start", workerId);
+ }
+ }
+ }
+
+ protected Map<Integer, LocalAssignment> getReassignExecutors(Map<Integer, LocalAssignment> assignExecutors, Set<Integer> keepPorts) {
+ Map<Integer, LocalAssignment> reassignExecutors = new HashMap<>();
+ reassignExecutors.putAll(assignExecutors);
+ for (Integer port : keepPorts) {
+ reassignExecutors.remove(port);
+ }
+ return reassignExecutors;
+ }
+
+ /**
+ * Returns map from worker id to worker heartbeat. if the heartbeat is nil, then the worker is dead
+ *
+ * @param assignedExecutors
+ * @return
+ * @throws Exception
+ */
+ public Map<String, StateHeartbeat> getLocalWorkerStats(SupervisorData supervisorData, Map<Integer, LocalAssignment> assignedExecutors, int now) throws Exception {
+ Map<String, StateHeartbeat> workerIdHbstate = new HashMap<>();
+ Map conf = supervisorData.getConf();
+ LocalState localState = supervisorData.getLocalState();
+ Map<String, LSWorkerHeartbeat> idToHeartbeat = SupervisorUtils.readWorkerHeartbeats(conf);
+ Map<String, Integer> approvedWorkers = localState.getApprovedWorkers();
+ Set<String> approvedIds = new HashSet<>();
+ if (approvedWorkers != null) {
+ approvedIds.addAll(approvedWorkers.keySet());
+ }
+ for (Map.Entry<String, LSWorkerHeartbeat> entry : idToHeartbeat.entrySet()) {
+ String workerId = entry.getKey();
+ LSWorkerHeartbeat whb = entry.getValue();
+ State state;
+ if (whb == null) {
+ state = State.NOT_STARTED;
+ } else if (!approvedIds.contains(workerId) || !matchesAssignment(whb, assignedExecutors)) {
+ state = State.DISALLOWED;
+ } else if (supervisorData.getDeadWorkers().contains(workerId)) {
+ LOG.info("Worker Process {} has died", workerId);
+ state = State.TIMED_OUT;
+ } else if (SupervisorUtils.isWorkerHbTimedOut(now, whb, conf)) {
+ state = State.TIMED_OUT;
+ } else {
+ state = State.VALID;
+ }
+ LOG.debug("Worker:{} state:{} WorkerHeartbeat:{} at supervisor time-secs {}", workerId, state, whb, now);
+ workerIdHbstate.put(workerId, new StateHeartbeat(state, whb));
+ }
+ return workerIdHbstate;
+ }
+
+ protected boolean matchesAssignment(LSWorkerHeartbeat whb, Map<Integer, LocalAssignment> assignedExecutors) {
+ LocalAssignment localAssignment = assignedExecutors.get(whb.get_port());
+ if (localAssignment == null || !localAssignment.get_topology_id().equals(whb.get_topology_id())) {
+ return false;
+ }
+ List<ExecutorInfo> executorInfos = new ArrayList<>();
+ executorInfos.addAll(whb.get_executors());
+ // remove SYSTEM_EXECUTOR_ID
+ executorInfos.remove(SYSTEM_EXECUTOR_INFO);
+ List<ExecutorInfo> localExecuorInfos = localAssignment.get_executors();
+
+ if (localExecuorInfos.size() != executorInfos.size())
+ return false;
+
+ for (ExecutorInfo executorInfo : localExecuorInfos){
+ if (!localExecuorInfos.contains(executorInfo))
+ return false;
+ }
+ return true;
+ }
+
+ /**
+ * launch a worker in local mode.
+ */
+ protected void launchLocalWorker(SupervisorData supervisorData, String stormId, Long port, String workerId, WorkerResources resources) throws IOException {
+ // port this function after porting worker to java
+ }
+
+ protected void launchDistributedWorker(IWorkerManager workerManager, Map conf, String supervisorId, String assignmentId, String stormId, Long port, String workerId,
+ WorkerResources resources, ConcurrentHashSet deadWorkers) throws IOException {
+ Map stormConf = ConfigUtils.readSupervisorStormConf(conf, stormId);
+ String user = (String) stormConf.get(Config.TOPOLOGY_SUBMITTER_USER);
+ writeLogMetadata(stormConf, user, workerId, stormId, port, conf);
+ ConfigUtils.setWorkerUserWSE(conf, workerId, user);
+ createArtifactsLink(conf, stormId, port, workerId);
+
+ String logPrefix = "Worker Process " + workerId;
+ if (deadWorkers != null)
+ deadWorkers.remove(workerId);
+ createBlobstoreLinks(conf, stormId, workerId);
+ ProcessExitCallback processExitCallback = new ProcessExitCallback(logPrefix, workerId);
+ workerManager.launchWorker(supervisorId, assignmentId, stormId, port, workerId, resources, processExitCallback);
+ }
+
+ protected Map<String, Integer> startNewWorkers(Map<Integer, String> newWorkerIds, Map<Integer, LocalAssignment> reassignExecutors) throws IOException {
+
+ Map<String, Integer> newValidWorkerIds = new HashMap<>();
+ Map conf = supervisorData.getConf();
+ String supervisorId = supervisorData.getSupervisorId();
+ String clusterMode = ConfigUtils.clusterMode(conf);
+
+ for (Map.Entry<Integer, LocalAssignment> entry : reassignExecutors.entrySet()) {
+ Integer port = entry.getKey();
+ LocalAssignment assignment = entry.getValue();
+ String workerId = newWorkerIds.get(port);
+ String stormId = assignment.get_topology_id();
+ WorkerResources resources = assignment.get_resources();
+
+ // This condition checks for required files exist before launching the worker
+ if (SupervisorUtils.doRequiredTopoFilesExist(conf, stormId)) {
+ String pidsPath = ConfigUtils.workerPidsRoot(conf, workerId);
+ String hbPath = ConfigUtils.workerHeartbeatsRoot(conf, workerId);
+
+ LOG.info("Launching worker with assignment {} for this supervisor {} on port {} with id {}", assignment, supervisorData.getSupervisorId(), port,
+ workerId);
+
+ FileUtils.forceMkdir(new File(pidsPath));
+ FileUtils.forceMkdir(new File(ConfigUtils.workerTmpRoot(conf, workerId)));
+ FileUtils.forceMkdir(new File(hbPath));
+
+ if (clusterMode.endsWith("distributed")) {
+ launchDistributedWorker(supervisorData.getWorkerManager(), conf, supervisorId, supervisorData.getAssignmentId(), stormId, port.longValue(), workerId, resources, supervisorData.getDeadWorkers());
+ } else if (clusterMode.endsWith("local")) {
+ launchLocalWorker(supervisorData, stormId, port.longValue(), workerId, resources);
+ }
+ newValidWorkerIds.put(workerId, port);
+
+ } else {
+ LOG.info("Missing topology storm code, so can't launch worker with assignment {} for this supervisor {} on port {} with id {}", assignment,
+ supervisorData.getSupervisorId(), port, workerId);
+ }
+
+ }
+ return newValidWorkerIds;
+ }
+
+ public void writeLogMetadata(Map stormconf, String user, String workerId, String stormId, Long port, Map conf) throws IOException {
+ Map data = new HashMap();
+ data.put(Config.TOPOLOGY_SUBMITTER_USER, user);
+ data.put("worker-id", workerId);
+
+ Set<String> logsGroups = new HashSet<>();
+ //for supervisor-test
+ if (stormconf.get(Config.LOGS_GROUPS) != null) {
+ List<String> groups = (List<String>) stormconf.get(Config.LOGS_GROUPS);
+ for (String group : groups){
+ logsGroups.add(group);
+ }
+ }
+ if (stormconf.get(Config.TOPOLOGY_GROUPS) != null) {
+ List<String> topGroups = (List<String>) stormconf.get(Config.TOPOLOGY_GROUPS);
+ logsGroups.addAll(topGroups);
+ }
+ data.put(Config.LOGS_GROUPS, logsGroups.toArray());
+
+ Set<String> logsUsers = new HashSet<>();
+ if (stormconf.get(Config.LOGS_USERS) != null) {
+ List<String> logUsers = (List<String>) stormconf.get(Config.LOGS_USERS);
+ for (String logUser : logUsers){
+ logsUsers.add(logUser);
+ }
+ }
+ if (stormconf.get(Config.TOPOLOGY_USERS) != null) {
+ List<String> topUsers = (List<String>) stormconf.get(Config.TOPOLOGY_USERS);
+ for (String logUser : topUsers){
+ logsUsers.add(logUser);
+ }
+ }
+ data.put(Config.LOGS_USERS, logsUsers.toArray());
+ writeLogMetadataToYamlFile(stormId, port, data, conf);
+ }
+
+ /**
+ * run worker as user needs the directory to have special permissions or it is insecure
+ *
+ * @param stormId
+ * @param port
+ * @param data
+ * @param conf
+ * @throws IOException
+ */
+ public void writeLogMetadataToYamlFile(String stormId, Long port, Map data, Map conf) throws IOException {
+ File file = ConfigUtils.getLogMetaDataFile(conf, stormId, port.intValue());
+
+ if (!Utils.checkFileExists(file.getParent())) {
+ if (Utils.getBoolean(conf.get(Config.SUPERVISOR_RUN_WORKER_AS_USER), false)) {
+ FileUtils.forceMkdir(file.getParentFile());
+ SupervisorUtils.setupStormCodeDir(conf, ConfigUtils.readSupervisorStormConf(conf, stormId), file.getParentFile().getCanonicalPath());
+ } else {
+ file.getParentFile().mkdirs();
+ }
+ }
+ FileWriter writer = new FileWriter(file);
+ Yaml yaml = new Yaml();
+ try {
+ yaml.dump(data, writer);
+ }finally {
+ writer.close();
+ }
+ }
+
+ /**
+ * Create a symlink from workder directory to its port artifacts directory
+ *
+ * @param conf
+ * @param stormId
+ * @param port
+ * @param workerId
+ */
+ protected void createArtifactsLink(Map conf, String stormId, Long port, String workerId) throws IOException {
+ String workerDir = ConfigUtils.workerRoot(conf, workerId);
+ String topoDir = ConfigUtils.workerArtifactsRoot(conf, stormId);
+ if (Utils.checkFileExists(workerDir)) {
+ Utils.createSymlink(workerDir, topoDir, "artifacts", String.valueOf(port));
+ }
+ }
+
+ /**
+ * Create symlinks in worker launch directory for all blobs
+ *
+ * @param conf
+ * @param stormId
+ * @param workerId
+ * @throws IOException
+ */
+ protected void createBlobstoreLinks(Map conf, String stormId, String workerId) throws IOException {
+ String stormRoot = ConfigUtils.supervisorStormDistRoot(conf, stormId);
+ Map stormConf = ConfigUtils.readSupervisorStormConf(conf, stormId);
+ String workerRoot = ConfigUtils.workerRoot(conf, workerId);
+ Map<String, Map<String, Object>> blobstoreMap = (Map<String, Map<String, Object>>) stormConf.get(Config.TOPOLOGY_BLOBSTORE_MAP);
+ List<String> blobFileNames = new ArrayList<>();
+ if (blobstoreMap != null) {
+ for (Map.Entry<String, Map<String, Object>> entry : blobstoreMap.entrySet()) {
+ String key = entry.getKey();
+ Map<String, Object> blobInfo = entry.getValue();
+ String ret = null;
+ if (blobInfo != null && blobInfo.containsKey("localname")) {
+ ret = (String) blobInfo.get("localname");
+ } else {
+ ret = key;
+ }
+ blobFileNames.add(ret);
+ }
+ }
+ List<String> resourceFileNames = new ArrayList<>();
+ resourceFileNames.add(ConfigUtils.RESOURCES_SUBDIR);
+ resourceFileNames.addAll(blobFileNames);
+ LOG.info("Creating symlinks for worker-id: {} storm-id: {} for files({}): {}", workerId, stormId, resourceFileNames.size(), resourceFileNames);
+ Utils.createSymlink(workerRoot, stormRoot, ConfigUtils.RESOURCES_SUBDIR);
+ for (String fileName : blobFileNames) {
+ Utils.createSymlink(workerRoot, stormRoot, fileName, fileName);
+ }
+ }
+
- public void shutWorker(SupervisorData supervisorData, IWorkerManager workerManager, String workerId) throws IOException, InterruptedException{
++ public void killWorker(SupervisorData supervisorData, IWorkerManager workerManager, String workerId) throws IOException, InterruptedException{
+ workerManager.shutdownWorker(supervisorData.getSupervisorId(), workerId, supervisorData.getWorkerThreadPids());
+ boolean success = workerManager.cleanupWorker(workerId);
+ if (success){
+ supervisorData.getDeadWorkers().remove(workerId);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/75364892/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SyncSupervisorEvent.java
----------------------------------------------------------------------
diff --cc storm-core/src/jvm/org/apache/storm/daemon/supervisor/SyncSupervisorEvent.java
index 4549d4d,0000000..4f33c85
mode 100644,000000..100644
--- a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SyncSupervisorEvent.java
+++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SyncSupervisorEvent.java
@@@ -1,632 -1,0 +1,626 @@@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.daemon.supervisor;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.storm.Config;
+import org.apache.storm.blobstore.BlobStore;
+import org.apache.storm.blobstore.ClientBlobStore;
+import org.apache.storm.cluster.IStateStorage;
+import org.apache.storm.cluster.IStormClusterState;
+import org.apache.storm.event.EventManager;
+import org.apache.storm.generated.*;
+import org.apache.storm.localizer.LocalResource;
+import org.apache.storm.localizer.LocalizedResource;
+import org.apache.storm.localizer.Localizer;
+import org.apache.storm.utils.*;
+import org.apache.thrift.transport.TTransportException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.net.JarURLConnection;
+import java.net.URL;
+import java.nio.file.Files;
+import java.nio.file.StandardCopyOption;
+import java.util.*;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class SyncSupervisorEvent implements Runnable {
+
+ private static final Logger LOG = LoggerFactory.getLogger(SyncSupervisorEvent.class);
+
+ private EventManager syncSupEventManager;
+ private EventManager syncProcessManager;
+ private IStormClusterState stormClusterState;
+ private LocalState localState;
+ private SyncProcessEvent syncProcesses;
+ private SupervisorData supervisorData;
+
+ public SyncSupervisorEvent(SupervisorData supervisorData, SyncProcessEvent syncProcesses, EventManager syncSupEventManager,
+ EventManager syncProcessManager) {
+
+ this.syncProcesses = syncProcesses;
+ this.syncSupEventManager = syncSupEventManager;
+ this.syncProcessManager = syncProcessManager;
+ this.stormClusterState = supervisorData.getStormClusterState();
+ this.localState = supervisorData.getLocalState();
+ this.supervisorData = supervisorData;
+ }
+
+ @Override
+ public void run() {
+ try {
+ Map conf = supervisorData.getConf();
+ Runnable syncCallback = new EventManagerPushCallback(this, syncSupEventManager);
+ List<String> stormIds = stormClusterState.assignments(syncCallback);
+ Map<String, Map<String, Object>> assignmentsSnapshot =
+ getAssignmentsSnapshot(stormClusterState, stormIds, supervisorData.getAssignmentVersions().get(), syncCallback);
+ Map<String, List<ProfileRequest>> stormIdToProfilerActions = getProfileActions(stormClusterState, stormIds);
+
+ Set<String> allDownloadedTopologyIds = SupervisorUtils.readDownLoadedStormIds(conf);
+ Map<String, String> stormcodeMap = readStormCodeLocations(assignmentsSnapshot);
+ Map<Integer, LocalAssignment> existingAssignment = localState.getLocalAssignmentsMap();
+ if (existingAssignment == null) {
+ existingAssignment = new HashMap<>();
+ }
+
+ Map<Integer, LocalAssignment> allAssignment =
+ readAssignments(assignmentsSnapshot, existingAssignment, supervisorData.getAssignmentId(), supervisorData.getSyncRetry());
+
+ Map<Integer, LocalAssignment> newAssignment = new HashMap<>();
+ Set<String> assignedStormIds = new HashSet<>();
+
+ for (Map.Entry<Integer, LocalAssignment> entry : allAssignment.entrySet()) {
+ if (supervisorData.getiSupervisor().confirmAssigned(entry.getKey())) {
+ newAssignment.put(entry.getKey(), entry.getValue());
+ assignedStormIds.add(entry.getValue().get_topology_id());
+ }
+ }
+
+ Set<String> srashStormIds = verifyDownloadedFiles(conf, supervisorData.getLocalizer(), assignedStormIds, allDownloadedTopologyIds);
+ Set<String> downloadedStormIds = new HashSet<>();
+ downloadedStormIds.addAll(allDownloadedTopologyIds);
+ downloadedStormIds.removeAll(srashStormIds);
+
+ LOG.debug("Synchronizing supervisor");
+ LOG.debug("Storm code map: {}", stormcodeMap);
+ LOG.debug("All assignment: {}", allAssignment);
+ LOG.debug("New assignment: {}", newAssignment);
+ LOG.debug("Assigned Storm Ids {}", assignedStormIds);
+ LOG.debug("All Downloaded Ids {}", allDownloadedTopologyIds);
+ LOG.debug("Checked Downloaded Ids {}", srashStormIds);
+ LOG.debug("Downloaded Ids {}", downloadedStormIds);
+ LOG.debug("Storm Ids Profiler Actions {}", stormIdToProfilerActions);
+
+ // download code first
+ // This might take awhile
+ // - should this be done separately from usual monitoring?
+ // should we only download when topology is assigned to this supervisor?
+ for (Map.Entry<String, String> entry : stormcodeMap.entrySet()) {
+ String stormId = entry.getKey();
+ if (!downloadedStormIds.contains(stormId) && assignedStormIds.contains(stormId)) {
+ LOG.info("Downloading code for storm id {}.", stormId);
+ try {
+ downloadStormCode(conf, stormId, entry.getValue(), supervisorData.getLocalizer());
+ } catch (Exception e) {
+ if (Utils.exceptionCauseIsInstanceOf(NimbusLeaderNotFoundException.class, e)) {
+ LOG.warn("Nimbus leader was not available.", e);
+ } else if (Utils.exceptionCauseIsInstanceOf(TTransportException.class, e)) {
+ LOG.warn("There was a connection problem with nimbus.", e);
+ } else {
+ throw e;
+ }
+ }
+ LOG.info("Finished downloading code for storm id {}", stormId);
+ }
+ }
+
+ LOG.debug("Writing new assignment {}", newAssignment);
+
+ Set<Integer> killWorkers = new HashSet<>();
+ killWorkers.addAll(existingAssignment.keySet());
+ killWorkers.removeAll(newAssignment.keySet());
+ for (Integer port : killWorkers) {
+ supervisorData.getiSupervisor().killedWorker(port);
+ }
+
+ killExistingWorkersWithChangeInComponents(supervisorData, existingAssignment, newAssignment);
+
+ supervisorData.getiSupervisor().assigned(newAssignment.keySet());
+ localState.setLocalAssignmentsMap(newAssignment);
+ supervisorData.setAssignmentVersions(assignmentsSnapshot);
+ supervisorData.setStormIdToProfileActions(stormIdToProfilerActions);
+
+ Map<Long, LocalAssignment> convertNewAssignment = new HashMap<>();
+ for (Map.Entry<Integer, LocalAssignment> entry : newAssignment.entrySet()) {
+ convertNewAssignment.put(entry.getKey().longValue(), entry.getValue());
+ }
+ supervisorData.setCurrAssignment(convertNewAssignment);
+ // remove any downloaded code that's no longer assigned or active
+ // important that this happens after setting the local assignment so that
+ // synchronize-supervisor doesn't try to launch workers for which the
+ // resources don't exist
+ if (Utils.isOnWindows()) {
+ shutdownDisallowedWorkers();
+ }
+ for (String stormId : allDownloadedTopologyIds) {
+ if (!stormcodeMap.containsKey(stormId)) {
+ LOG.info("Removing code for storm id {}.", stormId);
+ rmTopoFiles(conf, stormId, supervisorData.getLocalizer(), true);
+ }
+ }
+ syncProcessManager.add(syncProcesses);
+ } catch (Exception e) {
+ LOG.error("Failed to Sync Supervisor", e);
+ throw new RuntimeException(e);
+ }
+
+ }
+
+ private void killExistingWorkersWithChangeInComponents(SupervisorData supervisorData, Map<Integer, LocalAssignment> existingAssignment,
+ Map<Integer, LocalAssignment> newAssignment) throws Exception {
+ LocalState localState = supervisorData.getLocalState();
+ Map<Integer, LocalAssignment> assignedExecutors = localState.getLocalAssignmentsMap();
+ if (assignedExecutors == null) {
+ assignedExecutors = new HashMap<>();
+ }
+ int now = Time.currentTimeSecs();
+ Map<String, StateHeartbeat> workerIdHbstate = syncProcesses.getLocalWorkerStats(supervisorData, assignedExecutors, now);
+ Map<Integer, String> vaildPortToWorkerIds = new HashMap<>();
+ for (Map.Entry<String, StateHeartbeat> entry : workerIdHbstate.entrySet()) {
+ String workerId = entry.getKey();
+ StateHeartbeat stateHeartbeat = entry.getValue();
+ if (stateHeartbeat != null && stateHeartbeat.getState() == State.VALID) {
+ vaildPortToWorkerIds.put(stateHeartbeat.getHeartbeat().get_port(), workerId);
+ }
+ }
+
+ Map<Integer, LocalAssignment> intersectAssignment = new HashMap<>();
+ for (Map.Entry<Integer, LocalAssignment> entry : newAssignment.entrySet()) {
+ Integer port = entry.getKey();
+ if (existingAssignment.containsKey(port)) {
+ intersectAssignment.put(port, entry.getValue());
+ }
+ }
+
+ for (Integer port : intersectAssignment.keySet()) {
+ List<ExecutorInfo> existExecutors = existingAssignment.get(port).get_executors();
+ List<ExecutorInfo> newExecutors = newAssignment.get(port).get_executors();
- if (newExecutors.size() != existExecutors.size()) {
- syncProcesses.shutWorker(supervisorData, supervisorData.getWorkerManager(), vaildPortToWorkerIds.get(port));
- continue;
++ Set<ExecutorInfo> setExitExecutors = new HashSet<>(existExecutors);
++ Set<ExecutorInfo> setNewExecutors = new HashSet<>(newExecutors);
++ if (setExitExecutors != setNewExecutors){
++ syncProcesses.killWorker(supervisorData, supervisorData.getWorkerManager(), vaildPortToWorkerIds.get(port));
+ }
- for (ExecutorInfo executorInfo : newExecutors) {
- if (!existExecutors.contains(executorInfo)) {
- syncProcesses.shutWorker(supervisorData, supervisorData.getWorkerManager(), vaildPortToWorkerIds.get(port));
- break;
- }
- }
-
+ }
+ }
+
+ protected Map<String, Map<String, Object>> getAssignmentsSnapshot(IStormClusterState stormClusterState, List<String> stormIds,
+ Map<String, Map<String, Object>> localAssignmentVersion, Runnable callback) throws Exception {
+ Map<String, Map<String, Object>> updateAssignmentVersion = new HashMap<>();
+ for (String stormId : stormIds) {
+ Integer recordedVersion = -1;
+ Integer version = stormClusterState.assignmentVersion(stormId, callback);
+ if (localAssignmentVersion.containsKey(stormId) && localAssignmentVersion.get(stormId) != null) {
+ recordedVersion = (Integer) localAssignmentVersion.get(stormId).get(IStateStorage.VERSION);
+ }
+ if (version == null) {
+ // ignore
+ } else if (version == recordedVersion) {
+ updateAssignmentVersion.put(stormId, localAssignmentVersion.get(stormId));
+ } else {
+ Map<String, Object> assignmentVersion = (Map<String, Object>) stormClusterState.assignmentInfoWithVersion(stormId, callback);
+ updateAssignmentVersion.put(stormId, assignmentVersion);
+ }
+ }
+ return updateAssignmentVersion;
+ }
+
+ protected Map<String, List<ProfileRequest>> getProfileActions(IStormClusterState stormClusterState, List<String> stormIds) throws Exception {
+ Map<String, List<ProfileRequest>> ret = new HashMap<String, List<ProfileRequest>>();
+ for (String stormId : stormIds) {
+ List<ProfileRequest> profileRequests = stormClusterState.getTopologyProfileRequests(stormId);
+ ret.put(stormId, profileRequests);
+ }
+ return ret;
+ }
+
+ protected Map<String, String> readStormCodeLocations(Map<String, Map<String, Object>> assignmentsSnapshot) {
+ Map<String, String> stormcodeMap = new HashMap<>();
+ for (Map.Entry<String, Map<String, Object>> entry : assignmentsSnapshot.entrySet()) {
+ Assignment assignment = (Assignment) (entry.getValue().get(IStateStorage.DATA));
+ if (assignment != null) {
+ stormcodeMap.put(entry.getKey(), assignment.get_master_code_dir());
+ }
+ }
+ return stormcodeMap;
+ }
+
+ /**
+ * Remove a reference to a blob when its no longer needed.
+ *
+ * @param localizer
+ * @param stormId
+ * @param conf
+ */
+ protected void removeBlobReferences(Localizer localizer, String stormId, Map conf) throws Exception {
+ Map stormConf = ConfigUtils.readSupervisorStormConf(conf, stormId);
+ Map<String, Map<String, Object>> blobstoreMap = (Map<String, Map<String, Object>>) stormConf.get(Config.TOPOLOGY_BLOBSTORE_MAP);
+ String user = (String) stormConf.get(Config.TOPOLOGY_SUBMITTER_USER);
+ String topoName = (String) stormConf.get(Config.TOPOLOGY_NAME);
+ if (blobstoreMap != null) {
+ for (Map.Entry<String, Map<String, Object>> entry : blobstoreMap.entrySet()) {
+ String key = entry.getKey();
+ Map<String, Object> blobInfo = entry.getValue();
+ localizer.removeBlobReference(key, user, topoName, SupervisorUtils.shouldUncompressBlob(blobInfo));
+ }
+ }
+ }
+
+ protected void rmTopoFiles(Map conf, String stormId, Localizer localizer, boolean isrmBlobRefs) throws IOException {
+ String path = ConfigUtils.supervisorStormDistRoot(conf, stormId);
+ try {
+ if (isrmBlobRefs) {
+ removeBlobReferences(localizer, stormId, conf);
+ }
+ if (Utils.getBoolean(conf.get(Config.SUPERVISOR_RUN_WORKER_AS_USER), false)) {
+ SupervisorUtils.rmrAsUser(conf, stormId, path);
+ } else {
+ Utils.forceDelete(ConfigUtils.supervisorStormDistRoot(conf, stormId));
+ }
+ } catch (Exception e) {
+ LOG.info("Exception removing: {} ", stormId, e);
+ }
+ }
+
+ /**
+ * Check for the files exists to avoid supervisor crashing Also makes sure there is no necessity for locking"
+ *
+ * @param conf
+ * @param localizer
+ * @param assignedStormIds
+ * @param allDownloadedTopologyIds
+ * @return
+ */
+ protected Set<String> verifyDownloadedFiles(Map conf, Localizer localizer, Set<String> assignedStormIds, Set<String> allDownloadedTopologyIds)
+ throws IOException {
+ Set<String> srashStormIds = new HashSet<>();
+ for (String stormId : allDownloadedTopologyIds) {
+ if (assignedStormIds.contains(stormId)) {
+ if (!SupervisorUtils.doRequiredTopoFilesExist(conf, stormId)) {
+ LOG.debug("Files not present in topology directory");
+ rmTopoFiles(conf, stormId, localizer, false);
+ srashStormIds.add(stormId);
+ }
+ }
+ }
+ return srashStormIds;
+ }
+
+ /**
+ * download code ; two cluster mode: local and distributed
+ *
+ * @param conf
+ * @param stormId
+ * @param masterCodeDir
+ * @throws IOException
+ */
+ private void downloadStormCode(Map conf, String stormId, String masterCodeDir, Localizer localizer) throws Exception {
+ String clusterMode = ConfigUtils.clusterMode(conf);
+
+ if (clusterMode.endsWith("distributed")) {
+ downloadDistributeStormCode(conf, stormId, masterCodeDir, localizer);
+ } else if (clusterMode.endsWith("local")) {
+ downloadLocalStormCode(conf, stormId, masterCodeDir, localizer);
+ }
+ }
+
+ private void downloadLocalStormCode(Map conf, String stormId, String masterCodeDir, Localizer localizer) throws Exception {
+
+ String tmproot = ConfigUtils.supervisorTmpDir(conf) + Utils.FILE_PATH_SEPARATOR + Utils.uuid();
+ String stormroot = ConfigUtils.supervisorStormDistRoot(conf, stormId);
+ BlobStore blobStore = Utils.getNimbusBlobStore(conf, masterCodeDir, null);
+ try {
+ FileUtils.forceMkdir(new File(tmproot));
+ String stormCodeKey = ConfigUtils.masterStormCodeKey(stormId);
+ String stormConfKey = ConfigUtils.masterStormConfKey(stormId);
+ String codePath = ConfigUtils.supervisorStormCodePath(tmproot);
+ String confPath = ConfigUtils.supervisorStormConfPath(tmproot);
+ blobStore.readBlobTo(stormCodeKey, new FileOutputStream(codePath), null);
+ blobStore.readBlobTo(stormConfKey, new FileOutputStream(confPath), null);
+ } finally {
+ blobStore.shutdown();
+ }
+ FileUtils.moveDirectory(new File(tmproot), new File(stormroot));
+ SupervisorUtils.setupStormCodeDir(conf, ConfigUtils.readSupervisorStormConf(conf, stormId), stormroot);
+ ClassLoader classloader = Thread.currentThread().getContextClassLoader();
+
+ String resourcesJar = resourcesJar();
+
+ URL url = classloader.getResource(ConfigUtils.RESOURCES_SUBDIR);
+
+ String targetDir = stormroot + Utils.FILE_PATH_SEPARATOR + ConfigUtils.RESOURCES_SUBDIR;
+
+ if (resourcesJar != null) {
+ LOG.info("Extracting resources from jar at {} to {}", resourcesJar, targetDir);
+ Utils.extractDirFromJar(resourcesJar, ConfigUtils.RESOURCES_SUBDIR, stormroot);
+ } else if (url != null) {
+
+ LOG.info("Copying resources at {} to {} ", url.toString(), targetDir);
+ if (url.getProtocol() == "jar") {
+ JarURLConnection urlConnection = (JarURLConnection) url.openConnection();
+ Utils.extractDirFromJar(urlConnection.getJarFileURL().getFile(), ConfigUtils.RESOURCES_SUBDIR, stormroot);
+ } else {
+ FileUtils.copyDirectory(new File(url.getFile()), (new File(targetDir)));
+ }
+ }
+ }
+
+ /**
+ * Downloading to permanent location is atomic
+ *
+ * @param conf
+ * @param stormId
+ * @param masterCodeDir
+ * @param localizer
+ * @throws Exception
+ */
+ private void downloadDistributeStormCode(Map conf, String stormId, String masterCodeDir, Localizer localizer) throws Exception {
+
+ String tmproot = ConfigUtils.supervisorTmpDir(conf) + Utils.FILE_PATH_SEPARATOR + Utils.uuid();
+ String stormroot = ConfigUtils.supervisorStormDistRoot(conf, stormId);
+ ClientBlobStore blobStore = Utils.getClientBlobStoreForSupervisor(conf);
+ FileUtils.forceMkdir(new File(tmproot));
+ if (Utils.isOnWindows()) {
+ if (Utils.getBoolean(conf.get(Config.SUPERVISOR_RUN_WORKER_AS_USER), false)) {
+ throw new RuntimeException("ERROR: Windows doesn't implement setting the correct permissions");
+ }
+ } else {
+ Utils.restrictPermissions(tmproot);
+ }
+ String stormJarKey = ConfigUtils.masterStormJarKey(stormId);
+ String stormCodeKey = ConfigUtils.masterStormCodeKey(stormId);
+ String stormConfKey = ConfigUtils.masterStormConfKey(stormId);
+ String jarPath = ConfigUtils.supervisorStormJarPath(tmproot);
+ String codePath = ConfigUtils.supervisorStormCodePath(tmproot);
+ String confPath = ConfigUtils.supervisorStormConfPath(tmproot);
+ Utils.downloadResourcesAsSupervisor(stormJarKey, jarPath, blobStore);
+ Utils.downloadResourcesAsSupervisor(stormCodeKey, codePath, blobStore);
+ Utils.downloadResourcesAsSupervisor(stormConfKey, confPath, blobStore);
+ blobStore.shutdown();
+ Utils.extractDirFromJar(jarPath, ConfigUtils.RESOURCES_SUBDIR, tmproot);
+ downloadBlobsForTopology(conf, confPath, localizer, tmproot);
+ if (IsDownloadBlobsForTopologySucceed(confPath, tmproot)) {
+ LOG.info("Successfully downloaded blob resources for storm-id {}", stormId);
+ FileUtils.forceMkdir(new File(stormroot));
+ Files.move(new File(tmproot).toPath(), new File(stormroot).toPath(), StandardCopyOption.ATOMIC_MOVE);
+ SupervisorUtils.setupStormCodeDir(conf, ConfigUtils.readSupervisorStormConf(conf, stormId), stormroot);
+ } else {
+ LOG.info("Failed to download blob resources for storm-id ", stormId);
+ Utils.forceDelete(tmproot);
+ }
+ }
+
+ /**
+ * Assert if all blobs are downloaded for the given topology
+ *
+ * @param stormconfPath
+ * @param targetDir
+ * @return
+ */
+ protected boolean IsDownloadBlobsForTopologySucceed(String stormconfPath, String targetDir) throws IOException {
+ Map stormConf = Utils.fromCompressedJsonConf(FileUtils.readFileToByteArray(new File(stormconfPath)));
+ Map<String, Map<String, Object>> blobstoreMap = (Map<String, Map<String, Object>>) stormConf.get(Config.TOPOLOGY_BLOBSTORE_MAP);
+ List<String> blobFileNames = new ArrayList<>();
+ if (blobstoreMap != null) {
+ for (Map.Entry<String, Map<String, Object>> entry : blobstoreMap.entrySet()) {
+ String key = entry.getKey();
+ Map<String, Object> blobInfo = entry.getValue();
+ String ret = null;
+ if (blobInfo != null && blobInfo.containsKey("localname")) {
+ ret = (String) blobInfo.get("localname");
+ } else {
+ ret = key;
+ }
+ blobFileNames.add(ret);
+ }
+ }
+ for (String string : blobFileNames) {
+ if (!Utils.checkFileExists(string))
+ return false;
+ }
+ return true;
+ }
+
+ /**
+ * Download all blobs listed in the topology configuration for a given topology.
+ *
+ * @param conf
+ * @param stormconfPath
+ * @param localizer
+ * @param tmpRoot
+ */
+ protected void downloadBlobsForTopology(Map conf, String stormconfPath, Localizer localizer, String tmpRoot) throws IOException {
+ Map stormConf = ConfigUtils.readSupervisorStormConfGivenPath(conf, stormconfPath);
+ Map<String, Map<String, Object>> blobstoreMap = (Map<String, Map<String, Object>>) stormConf.get(Config.TOPOLOGY_BLOBSTORE_MAP);
+ String user = (String) stormConf.get(Config.TOPOLOGY_SUBMITTER_USER);
+ String topoName = (String) stormConf.get(Config.TOPOLOGY_NAME);
+ File userDir = localizer.getLocalUserFileCacheDir(user);
+ List<LocalResource> localResourceList = SupervisorUtils.blobstoreMapToLocalresources(blobstoreMap);
+ if (localResourceList.size() > 0) {
+ if (!userDir.exists()) {
+ FileUtils.forceMkdir(userDir);
+ }
+ try {
+ List<LocalizedResource> localizedResources = localizer.getBlobs(localResourceList, user, topoName, userDir);
+ setupBlobPermission(conf, user, userDir.toString());
+ for (LocalizedResource localizedResource : localizedResources) {
+ File rsrcFilePath = new File(localizedResource.getFilePath());
+ String keyName = rsrcFilePath.getName();
+ String blobSymlinkTargetName = new File(localizedResource.getCurrentSymlinkPath()).getName();
+
+ String symlinkName = null;
+ if (blobstoreMap != null) {
+ Map<String, Object> blobInfo = blobstoreMap.get(keyName);
+ if (blobInfo != null && blobInfo.containsKey("localname")) {
+ symlinkName = (String) blobInfo.get("localname");
+ } else {
+ symlinkName = keyName;
+ }
+ }
+ Utils.createSymlink(tmpRoot, rsrcFilePath.getParent(), symlinkName, blobSymlinkTargetName);
+ }
+ } catch (AuthorizationException authExp) {
+ LOG.error("AuthorizationException error {}", authExp);
+ } catch (KeyNotFoundException knf) {
+ LOG.error("KeyNotFoundException error {}", knf);
+ }
+ }
+ }
+
+ protected void setupBlobPermission(Map conf, String user, String path) throws IOException {
+ if (Utils.getBoolean(Config.SUPERVISOR_RUN_WORKER_AS_USER, false)) {
+ String logPrefix = "setup blob permissions for " + path;
+ SupervisorUtils.processLauncherAndWait(conf, user, Arrays.asList("blob", path), null, logPrefix);
+ }
+
+ }
+
+ private String resourcesJar() throws IOException {
+
+ String path = Utils.currentClasspath();
+ if (path == null) {
+ return null;
+ }
+ String[] paths = path.split(File.pathSeparator);
+ List<String> jarPaths = new ArrayList<String>();
+ for (String s : paths) {
+ if (s.endsWith(".jar")) {
+ jarPaths.add(s);
+ }
+ }
+
+ List<String> rtn = new ArrayList<String>();
+ int size = jarPaths.size();
+ for (int i = 0; i < size; i++) {
+ if (Utils.zipDoesContainDir(jarPaths.get(i), ConfigUtils.RESOURCES_SUBDIR)) {
+ rtn.add(jarPaths.get(i));
+ }
+ }
+ if (rtn.size() == 0)
+ return null;
+
+ return rtn.get(0);
+ }
+
+ protected Map<Integer, LocalAssignment> readAssignments(Map<String, Map<String, Object>> assignmentsSnapshot,
+ Map<Integer, LocalAssignment> existingAssignment, String assignmentId, AtomicInteger retries) {
+ try {
+ Map<Integer, LocalAssignment> portLA = new HashMap<Integer, LocalAssignment>();
+ for (Map.Entry<String, Map<String, Object>> assignEntry : assignmentsSnapshot.entrySet()) {
+ String stormId = assignEntry.getKey();
+ Assignment assignment = (Assignment) assignEntry.getValue().get(IStateStorage.DATA);
+
+ Map<Integer, LocalAssignment> portTasks = readMyExecutors(stormId, assignmentId, assignment);
+
+ for (Map.Entry<Integer, LocalAssignment> entry : portTasks.entrySet()) {
+
+ Integer port = entry.getKey();
+
+ LocalAssignment la = entry.getValue();
+
+ if (!portLA.containsKey(port)) {
+ portLA.put(port, la);
+ } else {
+ throw new RuntimeException("Should not have multiple topologys assigned to one port");
+ }
+ }
+ }
+ retries.set(0);
+ return portLA;
+ } catch (RuntimeException e) {
+ if (retries.get() > 2) {
+ throw e;
+ } else {
+ retries.addAndGet(1);
+ }
+ LOG.warn("{} : retrying {} of 3", e.getMessage(), retries.get());
+ return existingAssignment;
+ }
+ }
+
+ protected Map<Integer, LocalAssignment> readMyExecutors(String stormId, String assignmentId, Assignment assignment) {
+ Map<Integer, LocalAssignment> portTasks = new HashMap<>();
+ Map<Long, WorkerResources> slotsResources = new HashMap<>();
+ Map<NodeInfo, WorkerResources> nodeInfoWorkerResourcesMap = assignment.get_worker_resources();
+ if (nodeInfoWorkerResourcesMap != null) {
+ for (Map.Entry<NodeInfo, WorkerResources> entry : nodeInfoWorkerResourcesMap.entrySet()) {
+ if (entry.getKey().get_node().equals(assignmentId)) {
+ Set<Long> ports = entry.getKey().get_port();
+ for (Long port : ports) {
+ slotsResources.put(port, entry.getValue());
+ }
+ }
+ }
+ }
+ Map<List<Long>, NodeInfo> executorNodePort = assignment.get_executor_node_port();
+ if (executorNodePort != null) {
+ for (Map.Entry<List<Long>, NodeInfo> entry : executorNodePort.entrySet()) {
+ if (entry.getValue().get_node().equals(assignmentId)) {
+ for (Long port : entry.getValue().get_port()) {
+ LocalAssignment localAssignment = portTasks.get(port.intValue());
+ if (localAssignment == null) {
+ List<ExecutorInfo> executors = new ArrayList<ExecutorInfo>();
+ localAssignment = new LocalAssignment(stormId, executors);
+ if (slotsResources.containsKey(port)) {
+ localAssignment.set_resources(slotsResources.get(port));
+ }
+ portTasks.put(port.intValue(), localAssignment);
+ }
+ List<ExecutorInfo> executorInfoList = localAssignment.get_executors();
+ executorInfoList.add(new ExecutorInfo(entry.getKey().get(0).intValue(), entry.getKey().get(entry.getKey().size() - 1).intValue()));
+ }
+ }
+ }
+ }
+ return portTasks;
+ }
+
+ // I konw it's not a good idea to create SyncProcessEvent, but I only hope SyncProcessEvent is responsible for start/shutdown
+ // workers, and SyncSupervisorEvent is responsible for download/remove topologys' binary.
+ protected void shutdownDisallowedWorkers() throws Exception {
+ LocalState localState = supervisorData.getLocalState();
+ Map<Integer, LocalAssignment> assignedExecutors = localState.getLocalAssignmentsMap();
+ if (assignedExecutors == null) {
+ assignedExecutors = new HashMap<>();
+ }
+ int now = Time.currentTimeSecs();
+ Map<String, StateHeartbeat> workerIdHbstate = syncProcesses.getLocalWorkerStats(supervisorData, assignedExecutors, now);
+ LOG.debug("Allocated workers ", assignedExecutors);
+ for (Map.Entry<String, StateHeartbeat> entry : workerIdHbstate.entrySet()) {
+ String workerId = entry.getKey();
+ StateHeartbeat stateHeartbeat = entry.getValue();
+ if (stateHeartbeat.getState() == State.DISALLOWED) {
- syncProcesses.shutWorker(supervisorData, supervisorData.getWorkerManager(), workerId);
++ syncProcesses.killWorker(supervisorData, supervisorData.getWorkerManager(), workerId);
+ LOG.debug("{}'s state disallowed, so shutdown this worker");
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/75364892/storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/SupervisorHeartbeat.java
----------------------------------------------------------------------
diff --cc storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/SupervisorHeartbeat.java
index 4137e94,0000000..fd357c0
mode 100644,000000..100644
--- a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/SupervisorHeartbeat.java
+++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/SupervisorHeartbeat.java
@@@ -1,87 -1,0 +1,87 @@@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.daemon.supervisor.timer;
+
+import org.apache.storm.Config;
+import org.apache.storm.cluster.IStormClusterState;
+import org.apache.storm.daemon.supervisor.SupervisorData;
+import org.apache.storm.generated.SupervisorInfo;
+import org.apache.storm.utils.Time;
+import org.apache.storm.utils.Utils;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class SupervisorHeartbeat implements Runnable {
+
+ private final IStormClusterState stormClusterState;
+ private final String supervisorId;
+ private final Map conf;
+ private final SupervisorData supervisorData;
+
+ public SupervisorHeartbeat(Map conf, SupervisorData supervisorData) {
+ this.stormClusterState = supervisorData.getStormClusterState();
+ this.supervisorId = supervisorData.getSupervisorId();
+ this.supervisorData = supervisorData;
+ this.conf = conf;
+ }
+
- private SupervisorInfo update(Map conf, SupervisorData supervisorData) {
++ private SupervisorInfo buildSupervisorInfo(Map conf, SupervisorData supervisorData) {
+ SupervisorInfo supervisorInfo = new SupervisorInfo();
+ supervisorInfo.set_time_secs(Time.currentTimeSecs());
+ supervisorInfo.set_hostname(supervisorData.getHostName());
+ supervisorInfo.set_assignment_id(supervisorData.getAssignmentId());
+
+ List<Long> usedPorts = new ArrayList<>();
+ usedPorts.addAll(supervisorData.getCurrAssignment().get().keySet());
+ supervisorInfo.set_used_ports(usedPorts);
+ List metaDatas = (List)supervisorData.getiSupervisor().getMetadata();
+ List<Long> portList = new ArrayList<>();
+ if (metaDatas != null){
+ for (Object data : metaDatas){
+ Integer port = Utils.getInt(data);
+ if (port != null)
+ portList.add(port.longValue());
+ }
+ }
+
+ supervisorInfo.set_meta(portList);
+ supervisorInfo.set_scheduler_meta((Map<String, String>) conf.get(Config.SUPERVISOR_SCHEDULER_META));
+ supervisorInfo.set_uptime_secs(supervisorData.getUpTime().upTime());
+ supervisorInfo.set_version(supervisorData.getStormVersion());
+ supervisorInfo.set_resources_map(mkSupervisorCapacities(conf));
+ return supervisorInfo;
+ }
+
+ private Map<String, Double> mkSupervisorCapacities(Map conf) {
+ Map<String, Double> ret = new HashMap<String, Double>();
+ Double mem = (double) (conf.get(Config.SUPERVISOR_MEMORY_CAPACITY_MB));
+ ret.put(Config.SUPERVISOR_MEMORY_CAPACITY_MB, mem);
+ Double cpu = (double) (conf.get(Config.SUPERVISOR_CPU_CAPACITY));
+ ret.put(Config.SUPERVISOR_CPU_CAPACITY, cpu);
+ return ret;
+ }
+
+ @Override
+ public void run() {
- SupervisorInfo supervisorInfo = update(conf, supervisorData);
++ SupervisorInfo supervisorInfo = buildSupervisorInfo(conf, supervisorData);
+ stormClusterState.supervisorHeartbeat(supervisorId, supervisorInfo);
+ }
+}