You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2018/02/01 20:53:37 UTC
[2/4] storm git commit: STORM-2910: have metrics reported in the
background
STORM-2910: have metrics reported in the background
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/48c2fda8
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/48c2fda8
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/48c2fda8
Branch: refs/heads/master
Commit: 48c2fda867aa1d1123f6ba9c9f623ae80c9df280
Parents: ed0548e
Author: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Authored: Thu Jan 25 09:49:20 2018 -0600
Committer: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Committed: Thu Jan 25 09:49:20 2018 -0600
----------------------------------------------------------------------
.../storm/daemon/supervisor/Container.java | 18 ++++---
.../daemon/supervisor/OnlyLatestExecutor.java | 55 ++++++++++++++++++++
.../daemon/supervisor/ReadClusterState.java | 6 ++-
.../apache/storm/daemon/supervisor/Slot.java | 33 +++++++-----
.../storm/daemon/supervisor/Supervisor.java | 16 +++++-
.../storm/daemon/supervisor/SlotTest.java | 15 +++---
6 files changed, 112 insertions(+), 31 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/48c2fda8/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Container.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Container.java b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Container.java
index f45ce25..a06e44c 100644
--- a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Container.java
+++ b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Container.java
@@ -50,6 +50,7 @@ import org.apache.storm.utils.NimbusClient;
import org.apache.storm.utils.ServerConfigUtils;
import org.apache.storm.utils.ServerUtils;
import org.apache.storm.utils.Utils;
+import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.yaml.snakeyaml.Yaml;
@@ -709,7 +710,7 @@ public abstract class Container implements Killable {
/**
* Send worker metrics to Nimbus.
*/
- void processMetrics() {
+ void processMetrics(OnlyLatestExecutor<Integer> exec) {
try {
if (_usedMemory.get(_port) != null) {
// Make sure we don't process too frequently.
@@ -725,20 +726,23 @@ public abstract class Container implements Killable {
long timestamp = System.currentTimeMillis();
double value = _usedMemory.get(_port).memory;
WorkerMetricPoint workerMetric = new WorkerMetricPoint(MEMORY_USED_METRIC, timestamp, value, SYSTEM_COMPONENT_ID,
- INVALID_EXECUTOR_ID, INVALID_STREAM_ID);
+ INVALID_EXECUTOR_ID, INVALID_STREAM_ID);
WorkerMetricList metricList = new WorkerMetricList();
metricList.add_to_metrics(workerMetric);
WorkerMetrics metrics = new WorkerMetrics(_topologyId, _port, hostname, metricList);
- try (NimbusClient client = NimbusClient.getConfiguredClient(_conf)) {
- client.getClient().processWorkerMetrics(metrics);
- }
-
- this.lastMetricProcessTime = currentTimeMsec;
+ exec.execute(_port, () -> {
+ try (NimbusClient client = NimbusClient.getConfiguredClient(_conf)) {
+ client.getClient().processWorkerMetrics(metrics);
+ } catch (Exception e) {
+ LOG.error("Failed to process metrics", e);
+ }
+ });
}
} catch (Exception e) {
LOG.error("Failed to process metrics", e);
+ } finally {
this.lastMetricProcessTime = System.currentTimeMillis();
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/48c2fda8/storm-server/src/main/java/org/apache/storm/daemon/supervisor/OnlyLatestExecutor.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/OnlyLatestExecutor.java b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/OnlyLatestExecutor.java
new file mode 100644
index 0000000..bd73766
--- /dev/null
+++ b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/OnlyLatestExecutor.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.daemon.supervisor;
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.Executor;
+
+/**
+ * This allows you to submit a Runnable with a key. If the previous submission for that key has not yet run,
+ * it will be replaced with the latest one.
+ */
+public class OnlyLatestExecutor<K> {
+ private final Executor exec;
+ private final ConcurrentMap<K, Runnable> latest;
+
+ public OnlyLatestExecutor(Executor exec) {
+ this.exec = exec;
+ latest = new ConcurrentHashMap<>();
+ }
+
+ /**
+ * Run something in the future, but replace it with the latest if it is taking too long
+ * @param key what to use to dedupe things.
+ * @param r what you want to run.
+ */
+ public void execute(final K key, Runnable r) {
+ Runnable old = latest.put(key, r);
+ if (old == null) {
+ //It was not there before so we need to run it.
+ exec.execute(() -> {
+ Runnable run = latest.remove(key);
+ if (run != null) {
+ run.run();;
+ }
+ });
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/48c2fda8/storm-server/src/main/java/org/apache/storm/daemon/supervisor/ReadClusterState.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/ReadClusterState.java b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/ReadClusterState.java
index 884efcb..5d8bb33 100644
--- a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/ReadClusterState.java
+++ b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/ReadClusterState.java
@@ -65,7 +65,8 @@ public class ReadClusterState implements Runnable, AutoCloseable {
private final String host;
private final LocalState localState;
private final AtomicReference<Map<Long, LocalAssignment>> cachedAssignments;
-
+ private final OnlyLatestExecutor<Integer> metricsExec;
+
public ReadClusterState(Supervisor supervisor) throws Exception {
this.superConf = supervisor.getConf();
this.stormClusterState = supervisor.getStormClusterState();
@@ -77,6 +78,7 @@ public class ReadClusterState implements Runnable, AutoCloseable {
this.host = supervisor.getHostName();
this.localState = supervisor.getLocalState();
this.cachedAssignments = supervisor.getCurrAssignment();
+ this.metricsExec = new OnlyLatestExecutor<>(supervisor.getHeartbeatExecutor());
this.launcher = ContainerLauncher.make(superConf, assignmentId, supervisor.getSharedContext());
@@ -108,7 +110,7 @@ public class ReadClusterState implements Runnable, AutoCloseable {
private Slot mkSlot(int port) throws Exception {
return new Slot(localizer, superConf, launcher, host, port,
- localState, stormClusterState, iSuper, cachedAssignments);
+ localState, stormClusterState, iSuper, cachedAssignments, metricsExec);
}
@Override
http://git-wip-us.apache.org/repos/asf/storm/blob/48c2fda8/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Slot.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Slot.java b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Slot.java
index fe30c93..6700291 100644
--- a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Slot.java
+++ b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Slot.java
@@ -95,12 +95,14 @@ public class Slot extends Thread implements AutoCloseable, BlobChangingCallback
public final ISupervisor iSupervisor;
public final LocalState localState;
public final BlobChangingCallback changingCallback;
-
+ public final OnlyLatestExecutor<Integer> metricsExec;
+
StaticState(AsyncLocalizer localizer, long hbTimeoutMs, long firstHbTimeoutMs,
- long killSleepMs, long monitorFreqMs,
- ContainerLauncher containerLauncher, String host, int port,
- ISupervisor iSupervisor, LocalState localState,
- BlobChangingCallback changingCallback) {
+ long killSleepMs, long monitorFreqMs,
+ ContainerLauncher containerLauncher, String host, int port,
+ ISupervisor iSupervisor, LocalState localState,
+ BlobChangingCallback changingCallback,
+ OnlyLatestExecutor<Integer> metricsExec) {
this.localizer = localizer;
this.hbTimeoutMs = hbTimeoutMs;
this.firstHbTimeoutMs = firstHbTimeoutMs;
@@ -112,6 +114,7 @@ public class Slot extends Thread implements AutoCloseable, BlobChangingCallback
this.iSupervisor = iSupervisor;
this.localState = localState;
this.changingCallback = changingCallback;
+ this.metricsExec = metricsExec;
}
}
@@ -937,7 +940,7 @@ public class Slot extends Thread implements AutoCloseable, BlobChangingCallback
dynamicState = dynamicState.withProfileActions(mod, modPending);
}
- dynamicState.container.processMetrics();
+ dynamicState.container.processMetrics(staticState.metricsExec);
Time.sleep(staticState.monitorFreqMs);
return dynamicState;
@@ -971,14 +974,17 @@ public class Slot extends Thread implements AutoCloseable, BlobChangingCallback
private volatile boolean done = false;
private volatile DynamicState dynamicState;
private final AtomicReference<Map<Long, LocalAssignment>> cachedCurrentAssignments;
-
+ private final OnlyLatestExecutor<Integer> metricsExec;
+
public Slot(AsyncLocalizer localizer, Map<String, Object> conf,
- ContainerLauncher containerLauncher, String host,
- int port, LocalState localState,
- IStormClusterState clusterState,
- ISupervisor iSupervisor,
- AtomicReference<Map<Long, LocalAssignment>> cachedCurrentAssignments) throws Exception {
+ ContainerLauncher containerLauncher, String host,
+ int port, LocalState localState,
+ IStormClusterState clusterState,
+ ISupervisor iSupervisor,
+ AtomicReference<Map<Long, LocalAssignment>> cachedCurrentAssignments,
+ OnlyLatestExecutor<Integer> metricsExec) throws Exception {
super("SLOT_"+port);
+ this.metricsExec = metricsExec;
this.cachedCurrentAssignments = cachedCurrentAssignments;
this.clusterState = clusterState;
@@ -1024,7 +1030,8 @@ public class Slot extends Thread implements AutoCloseable, BlobChangingCallback
port,
iSupervisor,
localState,
- this);
+ this,
+ metricsExec);
this.newAssignment.set(dynamicState.newAssignment);
if (MachineState.RUNNING == dynamicState.state) {
//We are running so we should recover the blobs.
http://git-wip-us.apache.org/repos/asf/storm/blob/48c2fda8/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Supervisor.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Supervisor.java b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Supervisor.java
index ee5a55c..147a8aa 100644
--- a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Supervisor.java
+++ b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Supervisor.java
@@ -27,6 +27,8 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.commons.io.FileUtils;
@@ -74,6 +76,10 @@ public class Supervisor implements DaemonCommon, AutoCloseable {
private final AtomicReference<Map<Long, LocalAssignment>> currAssignment;
private final StormTimer heartbeatTimer;
private final StormTimer eventTimer;
+ //Right now this is only used for sending metrics to nimbus,
+ // but we may want to combine it with the heartbeatTimer at some point
+ // to really make this work well.
+ private final ExecutorService heartbeatExecutor;
private final AsyncLocalizer asyncLocalizer;
private EventManager eventManager;
private ReadClusterState readState;
@@ -89,6 +95,7 @@ public class Supervisor implements DaemonCommon, AutoCloseable {
this.upTime = Utils.makeUptimeComputer();
this.stormVersion = VersionInfo.getVersion();
this.sharedContext = sharedContext;
+ this.heartbeatExecutor = Executors.newFixedThreadPool(1);
iSupervisor.prepare(conf, ServerConfigUtils.supervisorIsupervisorDir(conf));
@@ -125,7 +132,14 @@ public class Supervisor implements DaemonCommon, AutoCloseable {
this.eventTimer = new StormTimer(null, new DefaultUncaughtExceptionHandler());
}
-
+
+ /**
+ * Get the executor service that is supposed to be used for heart-beats.
+ */
+ public ExecutorService getHeartbeatExecutor() {
+ return heartbeatExecutor;
+ }
+
public String getId() {
return supervisorId;
}
http://git-wip-us.apache.org/repos/asf/storm/blob/48c2fda8/storm-server/src/test/java/org/apache/storm/daemon/supervisor/SlotTest.java
----------------------------------------------------------------------
diff --git a/storm-server/src/test/java/org/apache/storm/daemon/supervisor/SlotTest.java b/storm-server/src/test/java/org/apache/storm/daemon/supervisor/SlotTest.java
index f627bc7..08a5266 100644
--- a/storm-server/src/test/java/org/apache/storm/daemon/supervisor/SlotTest.java
+++ b/storm-server/src/test/java/org/apache/storm/daemon/supervisor/SlotTest.java
@@ -17,7 +17,6 @@
*/
package org.apache.storm.daemon.supervisor;
-import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.lessThan;
import static org.junit.Assert.*;
@@ -150,7 +149,7 @@ public class SlotTest {
ContainerLauncher containerLauncher = mock(ContainerLauncher.class);
ISupervisor iSuper = mock(ISupervisor.class);
StaticState staticState = new StaticState(localizer, 1000, 1000, 1000, 1000,
- containerLauncher, "localhost", 8080, iSuper, state, cb);
+ containerLauncher, "localhost", 8080, iSuper, state, cb, null);
DynamicState dynamicState = new DynamicState(null, null, null);
DynamicState nextState = Slot.handleEmpty(dynamicState, staticState);
assertEquals(MachineState.EMPTY, nextState.state);
@@ -182,7 +181,7 @@ public class SlotTest {
ISupervisor iSuper = mock(ISupervisor.class);
StaticState staticState = new StaticState(localizer, 5000, 120000, 1000, 1000,
- containerLauncher, "localhost", port, iSuper, state, cb);
+ containerLauncher, "localhost", port, iSuper, state, cb, null);
DynamicState dynamicState = new DynamicState(null, null, null)
.withNewAssignment(newAssignment);
@@ -251,7 +250,7 @@ public class SlotTest {
ISupervisor iSuper = mock(ISupervisor.class);
LocalState state = mock(LocalState.class);
StaticState staticState = new StaticState(localizer, 5000, 120000, 1000, 1000,
- containerLauncher, "localhost", port, iSuper, state, cb);
+ containerLauncher, "localhost", port, iSuper, state, cb, null);
DynamicState dynamicState = new DynamicState(assignment, container, assignment);
DynamicState nextState = Slot.stateMachineStep(dynamicState, staticState);
@@ -312,7 +311,7 @@ public class SlotTest {
ISupervisor iSuper = mock(ISupervisor.class);
StaticState staticState = new StaticState(localizer, 5000, 120000, 1000, 1000,
- containerLauncher, "localhost", port, iSuper, state, cb);
+ containerLauncher, "localhost", port, iSuper, state, cb, null);
DynamicState dynamicState = new DynamicState(cAssignment, cContainer, nAssignment);
DynamicState nextState = Slot.stateMachineStep(dynamicState, staticState);
@@ -393,7 +392,7 @@ public class SlotTest {
ISupervisor iSuper = mock(ISupervisor.class);
LocalState state = mock(LocalState.class);
StaticState staticState = new StaticState(localizer, 5000, 120000, 1000, 1000,
- containerLauncher, "localhost", port, iSuper, state, cb);
+ containerLauncher, "localhost", port, iSuper, state, cb, null);
DynamicState dynamicState = new DynamicState(cAssignment, cContainer, null);
DynamicState nextState = Slot.stateMachineStep(dynamicState, staticState);
@@ -454,7 +453,7 @@ public class SlotTest {
ISupervisor iSuper = mock(ISupervisor.class);
LocalState state = mock(LocalState.class);
StaticState staticState = new StaticState(localizer, 5000, 120000, 1000, 1000,
- containerLauncher, "localhost", port, iSuper, state, cb);
+ containerLauncher, "localhost", port, iSuper, state, cb, null);
Set<TopoProfileAction> profileActions = new HashSet<>();
ProfileRequest request = new ProfileRequest();
request.set_action(ProfileAction.JPROFILE_STOP);
@@ -532,7 +531,7 @@ public class SlotTest {
ISupervisor iSuper = mock(ISupervisor.class);
long heartbeatTimeoutMs = 5000;
StaticState staticState = new StaticState(localizer, heartbeatTimeoutMs, 120_000, 1000, 1000,
- containerLauncher, "localhost", port, iSuper, state, cb);
+ containerLauncher, "localhost", port, iSuper, state, cb, null);
Set<Slot.BlobChanging> changing = new HashSet<>();
LocallyCachedBlob stormJar = mock(LocallyCachedBlob.class);