You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2016/04/01 16:07:56 UTC

[33/35] storm git commit: fix about RunProfilerActions

fix about RunProfilerActions


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/ac9942cf
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/ac9942cf
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/ac9942cf

Branch: refs/heads/master
Commit: ac9942cfee18dfb29dae1787c3f36f978cdcfa85
Parents: dba69b5 1b4edf4
Author: xiaojian.fxj <xi...@alibaba-inc.com>
Authored: Fri Apr 1 09:03:46 2016 +0800
Committer: xiaojian.fxj <xi...@alibaba-inc.com>
Committed: Fri Apr 1 09:21:47 2016 +0800

----------------------------------------------------------------------
 CHANGELOG.md                                    |   6 +
 docs/Trident-API-Overview.md                    | 100 ++++
 examples/storm-starter/pom.xml                  |  16 +-
 external/flux/flux-examples/pom.xml             |  13 +-
 external/flux/pom.xml                           |  13 +-
 external/sql/storm-sql-kafka/pom.xml            |  16 +-
 external/storm-kafka-client/README.md           |   9 +
 external/storm-kafka-client/pom.xml             |  86 +++
 .../apache/storm/kafka/spout/KafkaSpout.java    | 547 +++++++++++++++++++
 .../storm/kafka/spout/KafkaSpoutConfig.java     | 309 +++++++++++
 .../storm/kafka/spout/KafkaSpoutMessageId.java  | 101 ++++
 .../KafkaSpoutRetryExponentialBackoff.java      | 281 ++++++++++
 .../kafka/spout/KafkaSpoutRetryService.java     |  72 +++
 .../storm/kafka/spout/KafkaSpoutStream.java     |  70 +++
 .../storm/kafka/spout/KafkaSpoutStreams.java    | 158 ++++++
 .../kafka/spout/KafkaSpoutTupleBuilder.java     |  58 ++
 .../kafka/spout/KafkaSpoutTuplesBuilder.java    |  82 +++
 .../kafka/spout/test/KafkaSpoutTestBolt.java    |  50 ++
 .../spout/test/KafkaSpoutTopologyMain.java      | 133 +++++
 .../spout/test/TopicTest2TupleBuilder.java      |  40 ++
 .../test/TopicsTest0Test1TupleBuilder.java      |  42 ++
 external/storm-kafka/pom.xml                    |  16 +-
 .../storm/mongodb/bolt/MongoInsertBolt.java     |  74 ++-
 .../storm/mongodb/bolt/MongoUpdateBolt.java     |   3 +-
 .../storm/mongodb/common/MongoDBClient.java     |  20 +-
 .../storm/mongodb/trident/state/MongoState.java |   2 +-
 external/storm-solr/pom.xml                     |   8 +-
 pom.xml                                         |  57 +-
 storm-core/pom.xml                              |   6 -
 storm-core/src/clj/org/apache/storm/config.clj  |  27 -
 .../clj/org/apache/storm/daemon/executor.clj    | 222 +++-----
 .../src/clj/org/apache/storm/daemon/nimbus.clj  |   3 +-
 .../src/clj/org/apache/storm/daemon/task.clj    | 190 -------
 .../org/apache/storm/daemon/GrouperFactory.java | 243 ++++++++
 .../src/jvm/org/apache/storm/daemon/Task.java   | 247 +++++++++
 .../daemon/metrics/BuiltinMetricsUtil.java      |   8 +-
 .../supervisor/timer/RunProfilerActions.java    |  15 +-
 .../apache/storm/hooks/info/BoltAckInfo.java    |   8 +
 .../storm/hooks/info/BoltExecuteInfo.java       |   8 +
 .../apache/storm/hooks/info/BoltFailInfo.java   |   8 +
 .../org/apache/storm/hooks/info/EmitInfo.java   |   9 +
 .../apache/storm/hooks/info/SpoutAckInfo.java   |   9 +
 .../apache/storm/hooks/info/SpoutFailInfo.java  |   9 +
 .../jvm/org/apache/storm/stats/StatsUtil.java   |  23 +-
 .../jvm/org/apache/storm/utils/ConfigUtils.java |  35 +-
 .../test/clj/org/apache/storm/grouping_test.clj |  19 +-
 storm-dist/binary/src/main/assembly/binary.xml  |  14 +
 47 files changed, 2985 insertions(+), 500 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/ac9942cf/storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/RunProfilerActions.java
----------------------------------------------------------------------
diff --cc storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/RunProfilerActions.java
index 04467c2,0000000..3e1e34d
mode 100644,000000..100644
--- a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/RunProfilerActions.java
+++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/RunProfilerActions.java
@@@ -1,214 -1,0 +1,211 @@@
 +/**
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + * http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
 +package org.apache.storm.daemon.supervisor.timer;
 +
 +import com.google.common.collect.Lists;
 +import org.apache.storm.Config;
 +import org.apache.storm.cluster.IStormClusterState;
 +import org.apache.storm.daemon.supervisor.SupervisorData;
 +import org.apache.storm.daemon.supervisor.SupervisorUtils;
 +import org.apache.storm.generated.ProfileAction;
 +import org.apache.storm.generated.ProfileRequest;
 +import org.apache.storm.utils.ConfigUtils;
 +import org.apache.storm.utils.Utils;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +import java.io.BufferedReader;
 +import java.io.File;
 +import java.io.FileReader;
 +import java.io.IOException;
 +import java.util.*;
 +
 +public class RunProfilerActions implements Runnable {
 +    private static Logger LOG = LoggerFactory.getLogger(RunProfilerActions.class);
 +
 +    private Map conf;
 +    private IStormClusterState stormClusterState;
 +    private String hostName;
 +
 +    private String profileCmd;
 +
 +    private SupervisorData supervisorData;
 +
 +    private class ActionExitCallback implements Utils.ExitCodeCallable {
 +        private String stormId;
 +        private ProfileRequest profileRequest;
 +        private String logPrefix;
 +        private boolean stop;
 +
 +        public ActionExitCallback(String stormId, ProfileRequest profileRequest, String logPrefix, boolean stop) {
 +            this.stormId = stormId;
 +            this.profileRequest = profileRequest;
 +            this.logPrefix = logPrefix;
 +            this.stop = stop;
 +        }
 +
 +        @Override
 +        public Object call() throws Exception {
 +            return null;
 +        }
 +
 +        @Override
 +        public Object call(int exitCode) {
 +            LOG.info("{} profile-action exited for {}", logPrefix, exitCode);
 +            try {
 +                if (stop)
 +                    stormClusterState.deleteTopologyProfileRequests(stormId, profileRequest);
 +            } catch (Exception e) {
 +                LOG.warn("failed delete profileRequest: " + profileRequest);
 +            }
 +            return null;
 +        }
 +    }
 +
 +    public RunProfilerActions(SupervisorData supervisorData) {
 +        this.conf = supervisorData.getConf();
 +        this.stormClusterState = supervisorData.getStormClusterState();
 +        this.hostName = supervisorData.getHostName();
-         this.profileCmd = (String) (conf.get(Config.WORKER_PROFILER_COMMAND));
++        String stormHome = System.getProperty("storm.home");
++        this.profileCmd = stormHome + Utils.FILE_PATH_SEPARATOR + "bin" + Utils.FILE_PATH_SEPARATOR + conf.get(Config.WORKER_PROFILER_COMMAND);
 +        this.supervisorData = supervisorData;
 +    }
 +
 +    @Override
 +    public void run() {
 +        Map<String, List<ProfileRequest>> stormIdToActions = supervisorData.getStormIdToProfilerActions().get();
 +        try {
 +            for (Map.Entry<String, List<ProfileRequest>> entry : stormIdToActions.entrySet()) {
 +                String stormId = entry.getKey();
 +                List<ProfileRequest> requests = entry.getValue();
 +                if (requests != null) {
 +                    for (ProfileRequest profileRequest : requests) {
 +                        if (profileRequest.get_nodeInfo().get_node().equals(hostName)) {
 +                            boolean stop = System.currentTimeMillis() > profileRequest.get_time_stamp();
 +                            Long port = profileRequest.get_nodeInfo().get_port().iterator().next();
-                             String targetDir = ConfigUtils.workerArtifactsRoot(conf, String.valueOf(port));
++                            String targetDir = ConfigUtils.workerArtifactsRoot(conf, stormId, port.intValue());
 +                            Map stormConf = ConfigUtils.readSupervisorStormConf(conf, stormId);
 +
 +                            String user = null;
 +                            if (stormConf.get(Config.TOPOLOGY_SUBMITTER_USER) != null) {
 +                                user = (String) (stormConf.get(Config.TOPOLOGY_SUBMITTER_USER));
 +                            }
 +                            Map<String, String> env = null;
 +                            if (stormConf.get(Config.TOPOLOGY_ENVIRONMENT) != null) {
 +                                env = (Map<String, String>) stormConf.get(Config.TOPOLOGY_ENVIRONMENT);
 +                            } else {
 +                                env = new HashMap<String, String>();
 +                            }
 +
 +                            String str = ConfigUtils.workerArtifactsPidPath(conf, stormId, port.intValue());
 +                            StringBuilder stringBuilder = new StringBuilder();
 +
-                             try (FileReader reader = new FileReader(str);
-                                  BufferedReader br = new BufferedReader(reader)) {
-                                 int c;
-                                 while ((c = br.read()) >= 0) {
-                                     stringBuilder.append(c);
-                                 }
++                            String workerPid = null;
++                            try (FileReader reader = new FileReader(str); BufferedReader br = new BufferedReader(reader)) {
++                                workerPid = br.readLine().trim();
 +                            }
-                             String workerPid = stringBuilder.toString().trim();
 +                            ProfileAction profileAction = profileRequest.get_action();
 +                            String logPrefix = "ProfilerAction process " + stormId + ":" + port + " PROFILER_ACTION: " + profileAction + " ";
 +
 +                            // Until PROFILER_STOP action is invalid, keep launching profiler start in case worker restarted
 +                            // The profiler plugin script validates if JVM is recording before starting another recording.
 +                            List<String> command = mkCommand(profileAction, stop, workerPid, targetDir);
 +                            try {
 +                                ActionExitCallback actionExitCallback = new ActionExitCallback(stormId, profileRequest, logPrefix, stop);
 +                                launchProfilerActionForWorker(user, targetDir, command, env, actionExitCallback, logPrefix);
 +                            } catch (IOException e) {
 +                                LOG.error("Error in processing ProfilerAction '{}' for {}:{}, will retry later", profileAction, stormId, port);
 +                            } catch (RuntimeException e) {
 +                                LOG.error("Error in processing ProfilerAction '{}' for {}:{}, will retry later", profileAction, stormId, port);
 +                            }
 +                        }
 +                    }
 +                }
 +            }
 +        } catch (Exception e) {
 +            LOG.error("Error running profiler actions, will retry again later");
 +        }
 +    }
 +
 +    private void launchProfilerActionForWorker(String user, String targetDir, List<String> commands, Map<String, String> environment,
 +            final Utils.ExitCodeCallable exitCodeCallable, String logPrefix) throws IOException {
 +        File targetFile = new File(targetDir);
 +        if (Utils.getBoolean(conf.get(Config.SUPERVISOR_RUN_WORKER_AS_USER), false)) {
 +            LOG.info("Running as user:{} command:{}", user, commands);
 +            String containerFile = Utils.containerFilePath(targetDir);
 +            if (Utils.checkFileExists(containerFile)) {
 +                SupervisorUtils.rmrAsUser(conf, containerFile, containerFile);
 +            }
 +            String scriptFile = Utils.scriptFilePath(targetDir);
 +            if (Utils.checkFileExists(scriptFile)) {
 +                SupervisorUtils.rmrAsUser(conf, scriptFile, scriptFile);
 +            }
 +            String script = Utils.writeScript(targetDir, commands, environment);
 +            List<String> args = new ArrayList<>();
 +            args.add("profiler");
 +            args.add(targetDir);
 +            args.add(script);
 +            SupervisorUtils.processLauncher(conf, user, null, args, environment, logPrefix, exitCodeCallable, targetFile);
 +        } else {
 +            Utils.launchProcess(commands, environment, logPrefix, exitCodeCallable, targetFile);
 +        }
 +    }
 +
 +    private List<String> mkCommand(ProfileAction action, boolean stop, String workerPid, String targetDir) {
 +        if (action == ProfileAction.JMAP_DUMP) {
 +            return jmapDumpCmd(workerPid, targetDir);
 +        } else if (action == ProfileAction.JSTACK_DUMP) {
 +            return jstackDumpCmd(workerPid, targetDir);
 +        } else if (action == ProfileAction.JPROFILE_DUMP) {
 +            return jprofileDump(workerPid, targetDir);
 +        } else if (action == ProfileAction.JVM_RESTART) {
 +            return jprofileJvmRestart(workerPid);
 +        } else if (!stop && action == ProfileAction.JPROFILE_STOP) {
 +            return jprofileStart(workerPid);
 +        } else if (stop && action == ProfileAction.JPROFILE_STOP) {
 +            return jprofileStop(workerPid, targetDir);
 +        }
 +        return Lists.newArrayList();
 +    }
 +
 +    private List<String> jmapDumpCmd(String pid, String targetDir) {
 +        return Lists.newArrayList(profileCmd, pid, "jmap", targetDir);
 +    }
 +
 +    private List<String> jstackDumpCmd(String pid, String targetDir) {
 +        return Lists.newArrayList(profileCmd, pid, "jstack", targetDir);
 +    }
 +
 +    private List<String> jprofileStart(String pid) {
 +        return Lists.newArrayList(profileCmd, pid, "start");
 +    }
 +
 +    private List<String> jprofileStop(String pid, String targetDir) {
 +        return Lists.newArrayList(profileCmd, pid, "stop", targetDir);
 +    }
 +
 +    private List<String> jprofileDump(String pid, String targetDir) {
 +        return Lists.newArrayList(profileCmd, pid, "dump", targetDir);
 +    }
 +
 +    private List<String> jprofileJvmRestart(String pid) {
 +        return Lists.newArrayList(profileCmd, pid, "kill");
 +    }
 +
 +}