You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2016/04/01 16:07:56 UTC
[33/35] storm git commit: fix about RunProfilerActions
fix about RunProfilerActions
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/ac9942cf
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/ac9942cf
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/ac9942cf
Branch: refs/heads/master
Commit: ac9942cfee18dfb29dae1787c3f36f978cdcfa85
Parents: dba69b5 1b4edf4
Author: xiaojian.fxj <xi...@alibaba-inc.com>
Authored: Fri Apr 1 09:03:46 2016 +0800
Committer: xiaojian.fxj <xi...@alibaba-inc.com>
Committed: Fri Apr 1 09:21:47 2016 +0800
----------------------------------------------------------------------
CHANGELOG.md | 6 +
docs/Trident-API-Overview.md | 100 ++++
examples/storm-starter/pom.xml | 16 +-
external/flux/flux-examples/pom.xml | 13 +-
external/flux/pom.xml | 13 +-
external/sql/storm-sql-kafka/pom.xml | 16 +-
external/storm-kafka-client/README.md | 9 +
external/storm-kafka-client/pom.xml | 86 +++
.../apache/storm/kafka/spout/KafkaSpout.java | 547 +++++++++++++++++++
.../storm/kafka/spout/KafkaSpoutConfig.java | 309 +++++++++++
.../storm/kafka/spout/KafkaSpoutMessageId.java | 101 ++++
.../KafkaSpoutRetryExponentialBackoff.java | 281 ++++++++++
.../kafka/spout/KafkaSpoutRetryService.java | 72 +++
.../storm/kafka/spout/KafkaSpoutStream.java | 70 +++
.../storm/kafka/spout/KafkaSpoutStreams.java | 158 ++++++
.../kafka/spout/KafkaSpoutTupleBuilder.java | 58 ++
.../kafka/spout/KafkaSpoutTuplesBuilder.java | 82 +++
.../kafka/spout/test/KafkaSpoutTestBolt.java | 50 ++
.../spout/test/KafkaSpoutTopologyMain.java | 133 +++++
.../spout/test/TopicTest2TupleBuilder.java | 40 ++
.../test/TopicsTest0Test1TupleBuilder.java | 42 ++
external/storm-kafka/pom.xml | 16 +-
.../storm/mongodb/bolt/MongoInsertBolt.java | 74 ++-
.../storm/mongodb/bolt/MongoUpdateBolt.java | 3 +-
.../storm/mongodb/common/MongoDBClient.java | 20 +-
.../storm/mongodb/trident/state/MongoState.java | 2 +-
external/storm-solr/pom.xml | 8 +-
pom.xml | 57 +-
storm-core/pom.xml | 6 -
storm-core/src/clj/org/apache/storm/config.clj | 27 -
.../clj/org/apache/storm/daemon/executor.clj | 222 +++-----
.../src/clj/org/apache/storm/daemon/nimbus.clj | 3 +-
.../src/clj/org/apache/storm/daemon/task.clj | 190 -------
.../org/apache/storm/daemon/GrouperFactory.java | 243 ++++++++
.../src/jvm/org/apache/storm/daemon/Task.java | 247 +++++++++
.../daemon/metrics/BuiltinMetricsUtil.java | 8 +-
.../supervisor/timer/RunProfilerActions.java | 15 +-
.../apache/storm/hooks/info/BoltAckInfo.java | 8 +
.../storm/hooks/info/BoltExecuteInfo.java | 8 +
.../apache/storm/hooks/info/BoltFailInfo.java | 8 +
.../org/apache/storm/hooks/info/EmitInfo.java | 9 +
.../apache/storm/hooks/info/SpoutAckInfo.java | 9 +
.../apache/storm/hooks/info/SpoutFailInfo.java | 9 +
.../jvm/org/apache/storm/stats/StatsUtil.java | 23 +-
.../jvm/org/apache/storm/utils/ConfigUtils.java | 35 +-
.../test/clj/org/apache/storm/grouping_test.clj | 19 +-
storm-dist/binary/src/main/assembly/binary.xml | 14 +
47 files changed, 2985 insertions(+), 500 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/ac9942cf/storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/RunProfilerActions.java
----------------------------------------------------------------------
diff --cc storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/RunProfilerActions.java
index 04467c2,0000000..3e1e34d
mode 100644,000000..100644
--- a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/RunProfilerActions.java
+++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/RunProfilerActions.java
@@@ -1,214 -1,0 +1,211 @@@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.daemon.supervisor.timer;
+
+import com.google.common.collect.Lists;
+import org.apache.storm.Config;
+import org.apache.storm.cluster.IStormClusterState;
+import org.apache.storm.daemon.supervisor.SupervisorData;
+import org.apache.storm.daemon.supervisor.SupervisorUtils;
+import org.apache.storm.generated.ProfileAction;
+import org.apache.storm.generated.ProfileRequest;
+import org.apache.storm.utils.ConfigUtils;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileReader;
+import java.io.IOException;
+import java.util.*;
+
+public class RunProfilerActions implements Runnable {
+ private static Logger LOG = LoggerFactory.getLogger(RunProfilerActions.class);
+
+ private Map conf;
+ private IStormClusterState stormClusterState;
+ private String hostName;
+
+ private String profileCmd;
+
+ private SupervisorData supervisorData;
+
+ private class ActionExitCallback implements Utils.ExitCodeCallable {
+ private String stormId;
+ private ProfileRequest profileRequest;
+ private String logPrefix;
+ private boolean stop;
+
+ public ActionExitCallback(String stormId, ProfileRequest profileRequest, String logPrefix, boolean stop) {
+ this.stormId = stormId;
+ this.profileRequest = profileRequest;
+ this.logPrefix = logPrefix;
+ this.stop = stop;
+ }
+
+ @Override
+ public Object call() throws Exception {
+ return null;
+ }
+
+ @Override
+ public Object call(int exitCode) {
+ LOG.info("{} profile-action exited for {}", logPrefix, exitCode);
+ try {
+ if (stop)
+ stormClusterState.deleteTopologyProfileRequests(stormId, profileRequest);
+ } catch (Exception e) {
+ LOG.warn("failed delete profileRequest: " + profileRequest);
+ }
+ return null;
+ }
+ }
+
+ public RunProfilerActions(SupervisorData supervisorData) {
+ this.conf = supervisorData.getConf();
+ this.stormClusterState = supervisorData.getStormClusterState();
+ this.hostName = supervisorData.getHostName();
- this.profileCmd = (String) (conf.get(Config.WORKER_PROFILER_COMMAND));
++ String stormHome = System.getProperty("storm.home");
++ this.profileCmd = stormHome + Utils.FILE_PATH_SEPARATOR + "bin" + Utils.FILE_PATH_SEPARATOR + conf.get(Config.WORKER_PROFILER_COMMAND);
+ this.supervisorData = supervisorData;
+ }
+
+ @Override
+ public void run() {
+ Map<String, List<ProfileRequest>> stormIdToActions = supervisorData.getStormIdToProfilerActions().get();
+ try {
+ for (Map.Entry<String, List<ProfileRequest>> entry : stormIdToActions.entrySet()) {
+ String stormId = entry.getKey();
+ List<ProfileRequest> requests = entry.getValue();
+ if (requests != null) {
+ for (ProfileRequest profileRequest : requests) {
+ if (profileRequest.get_nodeInfo().get_node().equals(hostName)) {
+ boolean stop = System.currentTimeMillis() > profileRequest.get_time_stamp();
+ Long port = profileRequest.get_nodeInfo().get_port().iterator().next();
- String targetDir = ConfigUtils.workerArtifactsRoot(conf, String.valueOf(port));
++ String targetDir = ConfigUtils.workerArtifactsRoot(conf, stormId, port.intValue());
+ Map stormConf = ConfigUtils.readSupervisorStormConf(conf, stormId);
+
+ String user = null;
+ if (stormConf.get(Config.TOPOLOGY_SUBMITTER_USER) != null) {
+ user = (String) (stormConf.get(Config.TOPOLOGY_SUBMITTER_USER));
+ }
+ Map<String, String> env = null;
+ if (stormConf.get(Config.TOPOLOGY_ENVIRONMENT) != null) {
+ env = (Map<String, String>) stormConf.get(Config.TOPOLOGY_ENVIRONMENT);
+ } else {
+ env = new HashMap<String, String>();
+ }
+
+ String str = ConfigUtils.workerArtifactsPidPath(conf, stormId, port.intValue());
+ StringBuilder stringBuilder = new StringBuilder();
+
- try (FileReader reader = new FileReader(str);
- BufferedReader br = new BufferedReader(reader)) {
- int c;
- while ((c = br.read()) >= 0) {
- stringBuilder.append(c);
- }
++ String workerPid = null;
++ try (FileReader reader = new FileReader(str); BufferedReader br = new BufferedReader(reader)) {
++ workerPid = br.readLine().trim();
+ }
- String workerPid = stringBuilder.toString().trim();
+ ProfileAction profileAction = profileRequest.get_action();
+ String logPrefix = "ProfilerAction process " + stormId + ":" + port + " PROFILER_ACTION: " + profileAction + " ";
+
+ // Until PROFILER_STOP action is invalid, keep launching profiler start in case worker restarted
+ // The profiler plugin script validates if JVM is recording before starting another recording.
+ List<String> command = mkCommand(profileAction, stop, workerPid, targetDir);
+ try {
+ ActionExitCallback actionExitCallback = new ActionExitCallback(stormId, profileRequest, logPrefix, stop);
+ launchProfilerActionForWorker(user, targetDir, command, env, actionExitCallback, logPrefix);
+ } catch (IOException e) {
+ LOG.error("Error in processing ProfilerAction '{}' for {}:{}, will retry later", profileAction, stormId, port);
+ } catch (RuntimeException e) {
+ LOG.error("Error in processing ProfilerAction '{}' for {}:{}, will retry later", profileAction, stormId, port);
+ }
+ }
+ }
+ }
+ }
+ } catch (Exception e) {
+ LOG.error("Error running profiler actions, will retry again later");
+ }
+ }
+
+ private void launchProfilerActionForWorker(String user, String targetDir, List<String> commands, Map<String, String> environment,
+ final Utils.ExitCodeCallable exitCodeCallable, String logPrefix) throws IOException {
+ File targetFile = new File(targetDir);
+ if (Utils.getBoolean(conf.get(Config.SUPERVISOR_RUN_WORKER_AS_USER), false)) {
+ LOG.info("Running as user:{} command:{}", user, commands);
+ String containerFile = Utils.containerFilePath(targetDir);
+ if (Utils.checkFileExists(containerFile)) {
+ SupervisorUtils.rmrAsUser(conf, containerFile, containerFile);
+ }
+ String scriptFile = Utils.scriptFilePath(targetDir);
+ if (Utils.checkFileExists(scriptFile)) {
+ SupervisorUtils.rmrAsUser(conf, scriptFile, scriptFile);
+ }
+ String script = Utils.writeScript(targetDir, commands, environment);
+ List<String> args = new ArrayList<>();
+ args.add("profiler");
+ args.add(targetDir);
+ args.add(script);
+ SupervisorUtils.processLauncher(conf, user, null, args, environment, logPrefix, exitCodeCallable, targetFile);
+ } else {
+ Utils.launchProcess(commands, environment, logPrefix, exitCodeCallable, targetFile);
+ }
+ }
+
+ private List<String> mkCommand(ProfileAction action, boolean stop, String workerPid, String targetDir) {
+ if (action == ProfileAction.JMAP_DUMP) {
+ return jmapDumpCmd(workerPid, targetDir);
+ } else if (action == ProfileAction.JSTACK_DUMP) {
+ return jstackDumpCmd(workerPid, targetDir);
+ } else if (action == ProfileAction.JPROFILE_DUMP) {
+ return jprofileDump(workerPid, targetDir);
+ } else if (action == ProfileAction.JVM_RESTART) {
+ return jprofileJvmRestart(workerPid);
+ } else if (!stop && action == ProfileAction.JPROFILE_STOP) {
+ return jprofileStart(workerPid);
+ } else if (stop && action == ProfileAction.JPROFILE_STOP) {
+ return jprofileStop(workerPid, targetDir);
+ }
+ return Lists.newArrayList();
+ }
+
+ private List<String> jmapDumpCmd(String pid, String targetDir) {
+ return Lists.newArrayList(profileCmd, pid, "jmap", targetDir);
+ }
+
+ private List<String> jstackDumpCmd(String pid, String targetDir) {
+ return Lists.newArrayList(profileCmd, pid, "jstack", targetDir);
+ }
+
+ private List<String> jprofileStart(String pid) {
+ return Lists.newArrayList(profileCmd, pid, "start");
+ }
+
+ private List<String> jprofileStop(String pid, String targetDir) {
+ return Lists.newArrayList(profileCmd, pid, "stop", targetDir);
+ }
+
+ private List<String> jprofileDump(String pid, String targetDir) {
+ return Lists.newArrayList(profileCmd, pid, "dump", targetDir);
+ }
+
+ private List<String> jprofileJvmRestart(String pid) {
+ return Lists.newArrayList(profileCmd, pid, "kill");
+ }
+
+}