You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2018/09/17 19:54:04 UTC
[3/6] storm git commit: STORM-3197: Make StormMetricsRegistry
non-static
http://git-wip-us.apache.org/repos/asf/storm/blob/8c90f12d/storm-server/src/main/java/org/apache/storm/metricstore/rocksdb/RocksDbStore.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/metricstore/rocksdb/RocksDbStore.java b/storm-server/src/main/java/org/apache/storm/metricstore/rocksdb/RocksDbStore.java
index ee7f621..2ac1c20 100644
--- a/storm-server/src/main/java/org/apache/storm/metricstore/rocksdb/RocksDbStore.java
+++ b/storm-server/src/main/java/org/apache/storm/metricstore/rocksdb/RocksDbStore.java
@@ -55,12 +55,14 @@ public class RocksDbStore implements MetricStore, AutoCloseable {
* Create metric store instance using the configurations provided via the config map.
*
* @param config Storm config map
+ * @param metricsRegistry The Nimbus daemon metrics registry
* @throws MetricException on preparation error
*/
- public void prepare(Map<String, Object> config) throws MetricException {
+ @Override
+ public void prepare(Map<String, Object> config, StormMetricsRegistry metricsRegistry) throws MetricException {
validateConfig(config);
- this.failureMeter = StormMetricsRegistry.registerMeter("RocksDB:metric-failures");
+ this.failureMeter = metricsRegistry.registerMeter("RocksDB:metric-failures");
RocksDB.loadLibrary();
boolean createIfMissing = ObjectReader.getBoolean(config.get(DaemonConfig.STORM_ROCKSDB_CREATE_IF_MISSING), false);
@@ -87,7 +89,7 @@ public class RocksDbStore implements MetricStore, AutoCloseable {
if (config.containsKey(DaemonConfig.STORM_ROCKSDB_METRIC_DELETION_PERIOD_HOURS)) {
deletionPeriod = Integer.parseInt(config.get(DaemonConfig.STORM_ROCKSDB_METRIC_DELETION_PERIOD_HOURS).toString());
}
- metricsCleaner = new MetricsCleaner(this, retentionHours, deletionPeriod, failureMeter);
+ metricsCleaner = new MetricsCleaner(this, retentionHours, deletionPeriod, failureMeter, metricsRegistry);
// create thread to process insertion of all metrics
metricsWriter = new RocksDbMetricsWriter(this, this.queue, this.failureMeter);
http://git-wip-us.apache.org/repos/asf/storm/blob/8c90f12d/storm-server/src/main/java/org/apache/storm/nimbus/LeaderListenerCallback.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/nimbus/LeaderListenerCallback.java b/storm-server/src/main/java/org/apache/storm/nimbus/LeaderListenerCallback.java
index 3783fdb..edd7444 100644
--- a/storm-server/src/main/java/org/apache/storm/nimbus/LeaderListenerCallback.java
+++ b/storm-server/src/main/java/org/apache/storm/nimbus/LeaderListenerCallback.java
@@ -48,12 +48,14 @@ import org.slf4j.LoggerFactory;
* A callback function when nimbus gains leadership.
*/
public class LeaderListenerCallback {
- private static final Meter numGainedLeader = StormMetricsRegistry.registerMeter("nimbus:num-gained-leadership");
- private static final Meter numLostLeader = StormMetricsRegistry.registerMeter("nimbus:num-lost-leadership");
private static final Logger LOG = LoggerFactory.getLogger(LeaderListenerCallback.class);
private static final String STORM_JAR_SUFFIX = "-stormjar.jar";
private static final String STORM_CODE_SUFFIX = "-stormcode.ser";
private static final String STORM_CONF_SUFFIX = "-stormconf.ser";
+
+ private final Meter numGainedLeader;
+ private final Meter numLostLeader;
+
private final BlobStore blobStore;
private final TopoCache tc;
private final IStormClusterState clusterState;
@@ -73,7 +75,7 @@ public class LeaderListenerCallback {
* @param acls zookeeper acls
*/
public LeaderListenerCallback(Map conf, CuratorFramework zk, LeaderLatch leaderLatch, BlobStore blobStore,
- TopoCache tc, IStormClusterState clusterState, List<ACL> acls) {
+ TopoCache tc, IStormClusterState clusterState, List<ACL> acls, StormMetricsRegistry metricsRegistry) {
this.blobStore = blobStore;
this.tc = tc;
this.clusterState = clusterState;
@@ -81,6 +83,8 @@ public class LeaderListenerCallback {
this.leaderLatch = leaderLatch;
this.conf = conf;
this.acls = acls;
+ this.numGainedLeader = metricsRegistry.registerMeter("nimbus:num-gained-leadership");
+ this.numLostLeader = metricsRegistry.registerMeter("nimbus:num-lost-leadership");
}
/**
http://git-wip-us.apache.org/repos/asf/storm/blob/8c90f12d/storm-server/src/main/java/org/apache/storm/pacemaker/Pacemaker.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/pacemaker/Pacemaker.java b/storm-server/src/main/java/org/apache/storm/pacemaker/Pacemaker.java
index 2eea634..5144151 100644
--- a/storm-server/src/main/java/org/apache/storm/pacemaker/Pacemaker.java
+++ b/storm-server/src/main/java/org/apache/storm/pacemaker/Pacemaker.java
@@ -12,6 +12,7 @@
package org.apache.storm.pacemaker;
+import com.codahale.metrics.ExponentiallyDecayingReservoir;
import com.codahale.metrics.Histogram;
import com.codahale.metrics.Meter;
import java.util.ArrayList;
@@ -27,35 +28,41 @@ import org.apache.storm.generated.HBServerMessageType;
import org.apache.storm.metric.StormMetricsRegistry;
import org.apache.storm.shade.uk.org.lidalia.sysoutslf4j.context.SysOutOverSLF4J;
import org.apache.storm.utils.ConfigUtils;
+import org.apache.storm.utils.Utils;
import org.apache.storm.utils.VersionInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-
public class Pacemaker implements IServerMessageHandler {
private static final Logger LOG = LoggerFactory.getLogger(Pacemaker.class);
- private final static Meter meterSendPulseCount = StormMetricsRegistry.registerMeter("pacemaker:send-pulse-count");
- private final static Meter meterTotalReceivedSize = StormMetricsRegistry.registerMeter("pacemaker:total-receive-size");
- private final static Meter meterGetPulseCount = StormMetricsRegistry.registerMeter("pacemaker:get-pulse=count");
- private final static Meter meterTotalSentSize = StormMetricsRegistry.registerMeter("pacemaker:total-sent-size");
- private final static Histogram histogramHeartbeatSize = StormMetricsRegistry.registerHistogram("pacemaker:heartbeat-size");
+ private final Meter meterSendPulseCount;
+ private final Meter meterTotalReceivedSize;
+ private final Meter meterGetPulseCount;
+ private final Meter meterTotalSentSize;
+ private final Histogram histogramHeartbeatSize;
private final Map<String, byte[]> heartbeats;
private final Map<String, Object> conf;
-
- public Pacemaker(Map<String, Object> conf) {
+ public Pacemaker(Map<String, Object> conf, StormMetricsRegistry metricsRegistry) {
heartbeats = new ConcurrentHashMap<>();
this.conf = conf;
- StormMetricsRegistry.registerGauge("pacemaker:size-total-keys", heartbeats::size);
- StormMetricsRegistry.startMetricsReporters(conf);
+ this.meterSendPulseCount = metricsRegistry.registerMeter("pacemaker:send-pulse-count");
+ this.meterTotalReceivedSize = metricsRegistry.registerMeter("pacemaker:total-receive-size");
+ this.meterGetPulseCount = metricsRegistry.registerMeter("pacemaker:get-pulse=count");
+ this.meterTotalSentSize = metricsRegistry.registerMeter("pacemaker:total-sent-size");
+ this.histogramHeartbeatSize = metricsRegistry.registerHistogram("pacemaker:heartbeat-size", new ExponentiallyDecayingReservoir());
+ metricsRegistry.registerGauge("pacemaker:size-total-keys", heartbeats::size);
}
public static void main(String[] args) {
SysOutOverSLF4J.sendSystemOutAndErrToSLF4J();
Map<String, Object> conf = ConfigUtils.overrideLoginConfigWithSystemProperty(ConfigUtils.readStormConfig());
- final Pacemaker serverHandler = new Pacemaker(conf);
+ StormMetricsRegistry metricsRegistry = new StormMetricsRegistry();
+ final Pacemaker serverHandler = new Pacemaker(conf, metricsRegistry);
serverHandler.launchServer();
+ metricsRegistry.startMetricsReporters(conf);
+ Utils.addShutdownHookWithForceKillIn1Sec(metricsRegistry::stopMetricsReporters);
}
@Override
http://git-wip-us.apache.org/repos/asf/storm/blob/8c90f12d/storm-server/src/main/java/org/apache/storm/scheduler/Cluster.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/Cluster.java b/storm-server/src/main/java/org/apache/storm/scheduler/Cluster.java
index e8d771d..b8bc69a 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/Cluster.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/Cluster.java
@@ -39,6 +39,7 @@ import org.apache.storm.networktopography.DefaultRackDNSToSwitchMapping;
import org.apache.storm.scheduler.resource.normalization.NormalizedResourceOffer;
import org.apache.storm.scheduler.resource.normalization.NormalizedResourceRequest;
import org.apache.storm.scheduler.resource.normalization.NormalizedResources;
+import org.apache.storm.scheduler.resource.normalization.ResourceMetrics;
import org.apache.storm.shade.com.google.common.annotations.VisibleForTesting;
import org.apache.storm.utils.ConfigUtils;
import org.apache.storm.utils.ObjectReader;
@@ -80,17 +81,19 @@ public class Cluster implements ISchedulingState {
private final Map<String, Map<WorkerSlot, NormalizedResourceRequest>> nodeToScheduledResourcesCache;
private final Map<String, Set<WorkerSlot>> nodeToUsedSlotsCache;
private final Map<String, NormalizedResourceRequest> totalResourcesPerNodeCache = new HashMap<>();
+ private final ResourceMetrics resourceMetrics;
private SchedulerAssignmentImpl assignment;
private Set<String> blackListedHosts = new HashSet<>();
private INimbus inimbus;
public Cluster(
INimbus nimbus,
+ ResourceMetrics resourceMetrics,
Map<String, SupervisorDetails> supervisors,
Map<String, ? extends SchedulerAssignment> map,
Topologies topologies,
Map<String, Object> conf) {
- this(nimbus, supervisors, map, topologies, conf, null, null, null);
+ this(nimbus, resourceMetrics, supervisors, map, topologies, conf, null, null, null);
}
/**
@@ -99,6 +102,7 @@ public class Cluster implements ISchedulingState {
public Cluster(Cluster src) {
this(
src.inimbus,
+ src.resourceMetrics,
src.supervisors,
src.assignments,
src.topologies,
@@ -118,6 +122,7 @@ public class Cluster implements ISchedulingState {
public Cluster(Cluster src, Topologies topologies) {
this(
src.inimbus,
+ src.resourceMetrics,
src.supervisors,
src.assignments,
topologies,
@@ -129,6 +134,7 @@ public class Cluster implements ISchedulingState {
private Cluster(
INimbus nimbus,
+ ResourceMetrics resourceMetrics,
Map<String, SupervisorDetails> supervisors,
Map<String, ? extends SchedulerAssignment> assignments,
Topologies topologies,
@@ -137,6 +143,7 @@ public class Cluster implements ISchedulingState {
Set<String> blackListedHosts,
Map<String, List<String>> networkTopography) {
this.inimbus = nimbus;
+ this.resourceMetrics = resourceMetrics;
this.supervisors.putAll(supervisors);
this.nodeToScheduledResourcesCache = new HashMap<>(this.supervisors.size());
this.nodeToUsedSlotsCache = new HashMap<>(this.supervisors.size());
@@ -466,7 +473,7 @@ public class Cluster implements ISchedulingState {
for (SchedulerAssignment assignment: assignments.values()) {
for (Entry<WorkerSlot, WorkerResources> entry: assignment.getScheduledResources().entrySet()) {
if (sd.getId().equals(entry.getKey().getNodeId())) {
- ret.remove(entry.getValue());
+ ret.remove(entry.getValue(), getResourceMetrics());
}
}
}
@@ -812,7 +819,7 @@ public class Cluster implements ISchedulingState {
for (SupervisorDetails sup : supervisors.values()) {
if (!isBlackListed(sup.getId()) && !blacklistedSupervisorIds.contains(sup.getId())) {
available.add(sup.getTotalResources());
- available.remove(getAllScheduledResourcesForNode(sup.getId()));
+ available.remove(getAllScheduledResourcesForNode(sup.getId()), getResourceMetrics());
}
}
return available;
@@ -971,6 +978,10 @@ public class Cluster implements ISchedulingState {
nodeToUsedSlotsCache.computeIfAbsent(nodeId, MAKE_SET).add(workerSlot);
}
+ public ResourceMetrics getResourceMetrics() {
+ return resourceMetrics;
+ }
+
@Override
public NormalizedResourceRequest getAllScheduledResourcesForNode(String nodeId) {
return totalResourcesPerNodeCache.computeIfAbsent(nodeId, (nid) -> {
http://git-wip-us.apache.org/repos/asf/storm/blob/8c90f12d/storm-server/src/main/java/org/apache/storm/scheduler/SupervisorDetails.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/SupervisorDetails.java b/storm-server/src/main/java/org/apache/storm/scheduler/SupervisorDetails.java
index 4ac1057..2700871 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/SupervisorDetails.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/SupervisorDetails.java
@@ -17,6 +17,7 @@ import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import org.apache.storm.scheduler.resource.normalization.NormalizedResourceOffer;
+import org.apache.storm.scheduler.resource.normalization.ResourceMetrics;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
http://git-wip-us.apache.org/repos/asf/storm/blob/8c90f12d/storm-server/src/main/java/org/apache/storm/scheduler/TopologyDetails.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/TopologyDetails.java b/storm-server/src/main/java/org/apache/storm/scheduler/TopologyDetails.java
index 5d781aa..757f256 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/TopologyDetails.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/TopologyDetails.java
@@ -26,8 +26,6 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.storm.Config;
-import org.apache.storm.Constants;
-import org.apache.storm.daemon.Acker;
import org.apache.storm.generated.Bolt;
import org.apache.storm.generated.ComponentCommon;
import org.apache.storm.generated.ComponentType;
http://git-wip-us.apache.org/repos/asf/storm/blob/8c90f12d/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/BlacklistScheduler.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/BlacklistScheduler.java b/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/BlacklistScheduler.java
index 5034f42..1e5b04d 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/BlacklistScheduler.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/BlacklistScheduler.java
@@ -42,6 +42,7 @@ public class BlacklistScheduler implements IScheduler {
public static final int DEFAULT_BLACKLIST_SCHEDULER_TOLERANCE_TIME = 300;
private static final Logger LOG = LoggerFactory.getLogger(BlacklistScheduler.class);
private final IScheduler underlyingScheduler;
+ private final StormMetricsRegistry metricsRegistry;
protected int toleranceTime;
protected int toleranceCount;
protected int resumeTime;
@@ -55,8 +56,9 @@ public class BlacklistScheduler implements IScheduler {
protected Set<String> blacklistHost;
private Map<String, Object> conf;
- public BlacklistScheduler(IScheduler underlyingScheduler) {
+ public BlacklistScheduler(IScheduler underlyingScheduler, StormMetricsRegistry metricsRegistry) {
this.underlyingScheduler = underlyingScheduler;
+ this.metricsRegistry = metricsRegistry;
}
@Override
@@ -89,7 +91,7 @@ public class BlacklistScheduler implements IScheduler {
blacklistHost = new HashSet<>();
//nimbus:num-blacklisted-supervisor + non-blacklisted supervisor = nimbus:num-supervisors
- StormMetricsRegistry.registerGauge("nimbus:num-blacklisted-supervisor", () -> blacklistHost.size());
+ metricsRegistry.registerGauge("nimbus:num-blacklisted-supervisor", () -> blacklistHost.size());
}
@Override
http://git-wip-us.apache.org/repos/asf/storm/blob/8c90f12d/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/strategies/RasBlacklistStrategy.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/strategies/RasBlacklistStrategy.java b/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/strategies/RasBlacklistStrategy.java
index f7abc04..6c823ab 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/strategies/RasBlacklistStrategy.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/strategies/RasBlacklistStrategy.java
@@ -71,7 +71,7 @@ public class RasBlacklistStrategy extends DefaultBlacklistStrategy {
//Now we need to free up some resources...
Map<String, SupervisorDetails> availableSupervisors = cluster.getSupervisors();
NormalizedResourceOffer shortage = new NormalizedResourceOffer(needed);
- shortage.remove(available);
+ shortage.remove(available, cluster.getResourceMetrics());
int shortageSlots = neededSlots - availableSlots;
LOG.debug("Need {} and {} slots.", needed, neededSlots);
LOG.debug("Available {} and {} slots.", available, availableSlots);
@@ -86,7 +86,7 @@ public class RasBlacklistStrategy extends DefaultBlacklistStrategy {
NormalizedResourcesWithMemory sdAvailable = cluster.getAvailableResources(sd);
int sdAvailableSlots = cluster.getAvailablePorts(sd).size();
readyToRemove.add(supervisor);
- shortage.remove(sdAvailable);
+ shortage.remove(sdAvailable, cluster.getResourceMetrics());
shortageSlots -= sdAvailableSlots;
LOG.debug("Releasing {} with {} and {} slots leaving {} and {} slots to go", supervisor,
sdAvailable, sdAvailableSlots, shortage, shortageSlots);
http://git-wip-us.apache.org/repos/asf/storm/blob/8c90f12d/storm-server/src/main/java/org/apache/storm/scheduler/resource/RAS_Node.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/RAS_Node.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/RAS_Node.java
index 5975780..89c0460 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/resource/RAS_Node.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/RAS_Node.java
@@ -445,7 +445,7 @@ public class RAS_Node {
public NormalizedResourceOffer getTotalAvailableResources() {
if (sup != null) {
NormalizedResourceOffer availableResources = new NormalizedResourceOffer(sup.getTotalResources());
- if (availableResources.remove(cluster.getAllScheduledResourcesForNode(sup.getId()))) {
+ if (availableResources.remove(cluster.getAllScheduledResourcesForNode(sup.getId()), cluster.getResourceMetrics())) {
if (!loggedUnderageUsage) {
LOG.error("Resources on {} became negative and was clamped to 0 {}.", hostname, availableResources);
loggedUnderageUsage = true;
http://git-wip-us.apache.org/repos/asf/storm/blob/8c90f12d/storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceUtils.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceUtils.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceUtils.java
index 7ef5f74..366995e 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceUtils.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceUtils.java
@@ -39,8 +39,7 @@ public class ResourceUtils {
return null;
}
- public static Map<String, NormalizedResourceRequest> getBoltsResources(StormTopology topology,
- Map<String, Object> topologyConf) {
+ public static Map<String, NormalizedResourceRequest> getBoltsResources(StormTopology topology, Map<String, Object> topologyConf) {
Map<String, NormalizedResourceRequest> boltResources = new HashMap<>();
if (topology.get_bolts() != null) {
for (Map.Entry<String, Bolt> bolt : topology.get_bolts().entrySet()) {
@@ -55,8 +54,8 @@ public class ResourceUtils {
return boltResources;
}
- public static NormalizedResourceRequest getSpoutResources(StormTopology topology, Map<String, Object> topologyConf,
- String componentId) {
+ public static NormalizedResourceRequest getSpoutResources(StormTopology topology,
+ Map<String, Object> topologyConf, String componentId) {
if (topology.get_spouts() != null) {
SpoutSpec spout = topology.get_spouts().get(componentId);
return new NormalizedResourceRequest(spout.get_common(), topologyConf, componentId);
@@ -65,7 +64,7 @@ public class ResourceUtils {
}
public static Map<String, NormalizedResourceRequest> getSpoutsResources(StormTopology topology,
- Map<String, Object> topologyConf) {
+ Map<String, Object> topologyConf) {
Map<String, NormalizedResourceRequest> spoutResources = new HashMap<>();
if (topology.get_spouts() != null) {
for (Map.Entry<String, SpoutSpec> spout : topology.get_spouts().entrySet()) {
http://git-wip-us.apache.org/repos/asf/storm/blob/8c90f12d/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/NormalizedResourceOffer.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/NormalizedResourceOffer.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/NormalizedResourceOffer.java
index c680be7..707c893 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/NormalizedResourceOffer.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/NormalizedResourceOffer.java
@@ -84,14 +84,15 @@ public class NormalizedResourceOffer implements NormalizedResourcesWithMemory {
/**
* Remove the resources in other from this.
* @param other the resources to be removed.
+ * @param resourceMetrics The resource related metrics
* @return true if one or more resources in other were larger than available resources in this, else false.
*/
- public boolean remove(NormalizedResourcesWithMemory other) {
- boolean negativeResources = normalizedResources.remove(other.getNormalizedResources());
+ public boolean remove(NormalizedResourcesWithMemory other, ResourceMetrics resourceMetrics) {
+ boolean negativeResources = normalizedResources.remove(other.getNormalizedResources(), resourceMetrics);
totalMemoryMb -= other.getTotalMemoryMb();
if (totalMemoryMb < 0.0) {
negativeResources = true;
- NormalizedResources.numNegativeResourceEvents.mark();
+ resourceMetrics.getNegativeResourceEventsMeter().mark();
totalMemoryMb = 0.0;
}
return negativeResources;
@@ -100,14 +101,15 @@ public class NormalizedResourceOffer implements NormalizedResourcesWithMemory {
/**
* Remove the resources in other from this.
* @param other the resources to be removed.
+ * @param resourceMetrics The resource related metrics
* @return true if one or more resources in other were larger than available resources in this, else false.
*/
- public boolean remove(WorkerResources other) {
+ public boolean remove(WorkerResources other, ResourceMetrics resourceMetrics) {
boolean negativeResources = normalizedResources.remove(other);
totalMemoryMb -= (other.get_mem_off_heap() + other.get_mem_on_heap());
if (totalMemoryMb < 0.0) {
negativeResources = true;
- NormalizedResources.numNegativeResourceEvents.mark();
+ resourceMetrics.getNegativeResourceEventsMeter().mark();
totalMemoryMb = 0.0;
}
return negativeResources;
http://git-wip-us.apache.org/repos/asf/storm/blob/8c90f12d/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/NormalizedResourceRequest.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/NormalizedResourceRequest.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/NormalizedResourceRequest.java
index 14c6846..478a8be 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/NormalizedResourceRequest.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/NormalizedResourceRequest.java
@@ -43,7 +43,7 @@ public class NormalizedResourceRequest implements NormalizedResourcesWithMemory
private double offHeap;
private NormalizedResourceRequest(Map<String, ? extends Number> resources,
- Map<String, Double> defaultResources) {
+ Map<String, Double> defaultResources) {
if (resources == null && defaultResources == null) {
onHeap = 0.0;
offHeap = 0.0;
http://git-wip-us.apache.org/repos/asf/storm/blob/8c90f12d/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/NormalizedResources.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/NormalizedResources.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/NormalizedResources.java
index 1e6af6f..6419406 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/NormalizedResources.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/NormalizedResources.java
@@ -35,16 +35,13 @@ import org.slf4j.LoggerFactory;
public class NormalizedResources {
private static final Logger LOG = LoggerFactory.getLogger(NormalizedResources.class);
- public static final Meter numNegativeResourceEvents = StormMetricsRegistry.registerMeter("nimbus:num-negative-resource-events");
-
-
public static ResourceNameNormalizer RESOURCE_NAME_NORMALIZER;
private static ResourceMapArrayBridge RESOURCE_MAP_ARRAY_BRIDGE;
static {
resetResourceNames();
}
-
+
private double cpu;
private double[] otherResources;
@@ -130,14 +127,15 @@ public class NormalizedResources {
* Remove the other resources from this. This is the same as subtracting the resources in other from this.
*
* @param other the resources we want removed.
+ * @param resourceMetrics The resource related metrics
* @return true if the resources would have gone negative, but were clamped to 0.
*/
- public boolean remove(NormalizedResources other) {
+ public boolean remove(NormalizedResources other, ResourceMetrics resourceMetrics) {
boolean ret = false;
this.cpu -= other.cpu;
if (cpu < 0.0) {
ret = true;
- numNegativeResourceEvents.mark();
+ resourceMetrics.getNegativeResourceEventsMeter().mark();
cpu = 0.0;
}
int otherLength = other.otherResources.length;
@@ -146,7 +144,7 @@ public class NormalizedResources {
otherResources[i] -= other.otherResources[i];
if (otherResources[i] < 0.0) {
ret = true;
- numNegativeResourceEvents.mark();
+ resourceMetrics.getNegativeResourceEventsMeter().mark();
otherResources[i] = 0.0;
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/8c90f12d/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/ResourceMetrics.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/ResourceMetrics.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/ResourceMetrics.java
new file mode 100644
index 0000000..851499b
--- /dev/null
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/ResourceMetrics.java
@@ -0,0 +1,34 @@
+/*
+ * Copyright 2018 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.scheduler.resource.normalization;
+
+import com.codahale.metrics.Meter;
+import org.apache.storm.metric.StormMetricsRegistry;
+
+public class ResourceMetrics {
+
+ private final Meter numNegativeResourceEvents;
+
+ public ResourceMetrics(StormMetricsRegistry metricsRegistry) {
+ numNegativeResourceEvents = metricsRegistry.registerMeter("nimbus:num-negative-resource-events");
+ }
+
+ public Meter getNegativeResourceEventsMeter() {
+ return numNegativeResourceEvents;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/8c90f12d/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/BaseResourceAwareStrategy.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/BaseResourceAwareStrategy.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/BaseResourceAwareStrategy.java
index 59b0f31..b5da8db 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/BaseResourceAwareStrategy.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/BaseResourceAwareStrategy.java
@@ -45,6 +45,7 @@ import org.apache.storm.scheduler.resource.SchedulingResult;
import org.apache.storm.scheduler.resource.SchedulingStatus;
import org.apache.storm.scheduler.resource.normalization.NormalizedResourceOffer;
import org.apache.storm.scheduler.resource.normalization.NormalizedResourceRequest;
+import org.apache.storm.scheduler.resource.normalization.ResourceMetrics;
import org.apache.storm.shade.com.google.common.annotations.VisibleForTesting;
import org.apache.storm.shade.com.google.common.collect.Sets;
import org.slf4j.Logger;
@@ -632,12 +633,14 @@ public abstract class BaseResourceAwareStrategy implements IStrategy {
*/
static class AllResources {
List<ObjectResources> objectResources = new LinkedList<>();
- NormalizedResourceOffer availableResourcesOverall = new NormalizedResourceOffer();
- NormalizedResourceOffer totalResourcesOverall = new NormalizedResourceOffer();
+ final NormalizedResourceOffer availableResourcesOverall;
+ final NormalizedResourceOffer totalResourcesOverall;
String identifier;
public AllResources(String identifier) {
this.identifier = identifier;
+ this.availableResourcesOverall = new NormalizedResourceOffer();
+ this.totalResourcesOverall = new NormalizedResourceOffer();
}
public AllResources(AllResources other) {
@@ -666,12 +669,14 @@ public abstract class BaseResourceAwareStrategy implements IStrategy {
*/
static class ObjectResources {
public final String id;
- public NormalizedResourceOffer availableResources = new NormalizedResourceOffer();
- public NormalizedResourceOffer totalResources = new NormalizedResourceOffer();
+ public NormalizedResourceOffer availableResources;
+ public NormalizedResourceOffer totalResources;
public double effectiveResources = 0.0;
public ObjectResources(String id) {
this.id = id;
+ this.availableResources = new NormalizedResourceOffer();
+ this.totalResources = new NormalizedResourceOffer();
}
public ObjectResources(ObjectResources other) {
http://git-wip-us.apache.org/repos/asf/storm/blob/8c90f12d/storm-server/src/main/java/org/apache/storm/zookeeper/LeaderElectorImp.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/zookeeper/LeaderElectorImp.java b/storm-server/src/main/java/org/apache/storm/zookeeper/LeaderElectorImp.java
index 03fec1f..6dc33e8 100644
--- a/storm-server/src/main/java/org/apache/storm/zookeeper/LeaderElectorImp.java
+++ b/storm-server/src/main/java/org/apache/storm/zookeeper/LeaderElectorImp.java
@@ -20,6 +20,7 @@ import java.util.concurrent.atomic.AtomicReference;
import org.apache.storm.blobstore.BlobStore;
import org.apache.storm.cluster.IStormClusterState;
import org.apache.storm.daemon.nimbus.TopoCache;
+import org.apache.storm.metric.StormMetricsRegistry;
import org.apache.storm.nimbus.ILeaderElector;
import org.apache.storm.nimbus.LeaderListenerCallback;
import org.apache.storm.nimbus.NimbusInfo;
@@ -45,10 +46,12 @@ public class LeaderElectorImp implements ILeaderElector {
private final TopoCache tc;
private final IStormClusterState clusterState;
private final List<ACL> acls;
+ private final StormMetricsRegistry metricsRegistry;
public LeaderElectorImp(Map<String, Object> conf, List<String> servers, CuratorFramework zk, String leaderlockPath, String id,
AtomicReference<LeaderLatch> leaderLatch, AtomicReference<LeaderLatchListener> leaderLatchListener,
- BlobStore blobStore, final TopoCache tc, IStormClusterState clusterState, List<ACL> acls) {
+ BlobStore blobStore, final TopoCache tc, IStormClusterState clusterState, List<ACL> acls,
+ StormMetricsRegistry metricsRegistry) {
this.conf = conf;
this.servers = servers;
this.zk = zk;
@@ -60,6 +63,7 @@ public class LeaderElectorImp implements ILeaderElector {
this.tc = tc;
this.clusterState = clusterState;
this.acls = acls;
+ this.metricsRegistry = metricsRegistry;
}
@Override
@@ -72,7 +76,8 @@ public class LeaderElectorImp implements ILeaderElector {
// if this latch is already closed, we need to create new instance.
if (LeaderLatch.State.CLOSED.equals(leaderLatch.get().getState())) {
leaderLatch.set(new LeaderLatch(zk, leaderlockPath));
- LeaderListenerCallback callback = new LeaderListenerCallback(conf, zk, leaderLatch.get(), blobStore, tc, clusterState, acls);
+ LeaderListenerCallback callback = new LeaderListenerCallback(conf, zk, leaderLatch.get(), blobStore, tc, clusterState, acls,
+ metricsRegistry);
leaderLatchListener.set(Zookeeper.leaderLatchListenerImpl(callback));
LOG.info("LeaderLatch was in closed state. Resetted the leaderLatch and listeners.");
}
http://git-wip-us.apache.org/repos/asf/storm/blob/8c90f12d/storm-server/src/main/java/org/apache/storm/zookeeper/Zookeeper.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/zookeeper/Zookeeper.java b/storm-server/src/main/java/org/apache/storm/zookeeper/Zookeeper.java
index f30c246..5468573 100644
--- a/storm-server/src/main/java/org/apache/storm/zookeeper/Zookeeper.java
+++ b/storm-server/src/main/java/org/apache/storm/zookeeper/Zookeeper.java
@@ -31,6 +31,7 @@ import org.apache.storm.Config;
import org.apache.storm.blobstore.BlobStore;
import org.apache.storm.cluster.IStormClusterState;
import org.apache.storm.daemon.nimbus.TopoCache;
+import org.apache.storm.metric.StormMetricsRegistry;
import org.apache.storm.nimbus.ILeaderElector;
import org.apache.storm.nimbus.LeaderListenerCallback;
import org.apache.storm.nimbus.NimbusInfo;
@@ -146,13 +147,14 @@ public class Zookeeper {
* @throws UnknownHostException
*/
public static ILeaderElector zkLeaderElector(Map<String, Object> conf, CuratorFramework zkClient, BlobStore blobStore,
- final TopoCache tc, IStormClusterState clusterState, List<ACL> acls) throws
- UnknownHostException {
- return _instance.zkLeaderElectorImpl(conf, zkClient, blobStore, tc, clusterState, acls);
+ final TopoCache tc, IStormClusterState clusterState, List<ACL> acls,
+ StormMetricsRegistry metricsRegistry) throws UnknownHostException {
+ return _instance.zkLeaderElectorImpl(conf, zkClient, blobStore, tc, clusterState, acls, metricsRegistry);
}
protected ILeaderElector zkLeaderElectorImpl(Map<String, Object> conf, CuratorFramework zk, BlobStore blobStore,
- final TopoCache tc, IStormClusterState clusterState, List<ACL> acls) throws
+ final TopoCache tc, IStormClusterState clusterState, List<ACL> acls,
+ StormMetricsRegistry metricsRegistry) throws
UnknownHostException {
List<String> servers = (List<String>) conf.get(Config.STORM_ZOOKEEPER_SERVERS);
String leaderLockPath = "/leader-lock";
@@ -160,9 +162,9 @@ public class Zookeeper {
AtomicReference<LeaderLatch> leaderLatchAtomicReference = new AtomicReference<>(new LeaderLatch(zk, leaderLockPath, id));
AtomicReference<LeaderLatchListener> leaderLatchListenerAtomicReference =
new AtomicReference<>(leaderLatchListenerImpl(
- new LeaderListenerCallback(conf, zk, leaderLatchAtomicReference.get(), blobStore, tc, clusterState, acls)));
+ new LeaderListenerCallback(conf, zk, leaderLatchAtomicReference.get(), blobStore, tc, clusterState, acls, metricsRegistry)));
return new LeaderElectorImp(conf, servers, zk, leaderLockPath, id, leaderLatchAtomicReference,
- leaderLatchListenerAtomicReference, blobStore, tc, clusterState, acls);
+ leaderLatchListenerAtomicReference, blobStore, tc, clusterState, acls, metricsRegistry);
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/8c90f12d/storm-server/src/test/java/org/apache/storm/PacemakerTest.java
----------------------------------------------------------------------
diff --git a/storm-server/src/test/java/org/apache/storm/PacemakerTest.java b/storm-server/src/test/java/org/apache/storm/PacemakerTest.java
index 5f7f8e9..dfd53be 100644
--- a/storm-server/src/test/java/org/apache/storm/PacemakerTest.java
+++ b/storm-server/src/test/java/org/apache/storm/PacemakerTest.java
@@ -20,6 +20,7 @@ import org.apache.storm.generated.HBMessage;
import org.apache.storm.generated.HBMessageData;
import org.apache.storm.generated.HBPulse;
import org.apache.storm.generated.HBServerMessageType;
+import org.apache.storm.metric.StormMetricsRegistry;
import org.apache.storm.pacemaker.Pacemaker;
import org.apache.storm.utils.Utils;
import org.junit.Assert;
@@ -31,15 +32,16 @@ public class PacemakerTest {
private HBMessage hbMessage;
private int mid;
private Random random;
+ private Pacemaker handler;
@Before
public void init() {
random = new Random(100);
+ handler = new Pacemaker(new ConcurrentHashMap<>(), new StormMetricsRegistry());
}
@Test
public void testServerCreatePath() {
- Pacemaker handler = new Pacemaker(new ConcurrentHashMap());
messageWithRandId(HBServerMessageType.CREATE_PATH, HBMessageData.path("/testpath"));
HBMessage response = handler.handleMessage(hbMessage, true);
Assert.assertEquals(mid, response.get_message_id());
@@ -49,7 +51,6 @@ public class PacemakerTest {
@Test
public void testServerExistsFalse() {
- Pacemaker handler = new Pacemaker(new ConcurrentHashMap());
messageWithRandId(HBServerMessageType.EXISTS, HBMessageData.path("/testpath"));
HBMessage badResponse = handler.handleMessage(hbMessage, false);
HBMessage goodResponse = handler.handleMessage(hbMessage, true);
@@ -65,7 +66,6 @@ public class PacemakerTest {
public void testServerExistsTrue() {
String path = "/exists_path";
String dataString = "pulse data";
- Pacemaker handler = new Pacemaker(new ConcurrentHashMap());
HBPulse hbPulse = new HBPulse();
hbPulse.set_id(path);
hbPulse.set_details(Utils.javaSerialize(dataString));
@@ -87,7 +87,6 @@ public class PacemakerTest {
public void testServerSendPulseGetPulse() throws UnsupportedEncodingException {
String path = "/pulsepath";
String dataString = "pulse data";
- Pacemaker handler = new Pacemaker(new ConcurrentHashMap());
HBPulse hbPulse = new HBPulse();
hbPulse.set_id(path);
hbPulse.set_details(dataString.getBytes("UTF-8"));
@@ -106,7 +105,6 @@ public class PacemakerTest {
@Test
public void testServerGetAllPulseForPath() {
- Pacemaker handler = new Pacemaker(new ConcurrentHashMap());
messageWithRandId(HBServerMessageType.GET_ALL_PULSE_FOR_PATH, HBMessageData.path("/testpath"));
HBMessage badResponse = handler.handleMessage(hbMessage, false);
HBMessage goodResponse = handler.handleMessage(hbMessage, true);
@@ -120,7 +118,6 @@ public class PacemakerTest {
@Test
public void testServerGetAllNodesForPath() throws UnsupportedEncodingException {
- Pacemaker handler = new Pacemaker(new ConcurrentHashMap());
makeNode(handler, "/some-root-path/foo");
makeNode(handler, "/some-root-path/bar");
makeNode(handler, "/some-root-path/baz");
@@ -162,7 +159,6 @@ public class PacemakerTest {
@Test
public void testServerGetPulse() throws UnsupportedEncodingException {
- Pacemaker handler = new Pacemaker(new ConcurrentHashMap());
makeNode(handler, "/some-root/GET_PULSE");
messageWithRandId(HBServerMessageType.GET_PULSE, HBMessageData.path("/some-root/GET_PULSE"));
HBMessage badResponse = handler.handleMessage(hbMessage, false);
@@ -180,7 +176,6 @@ public class PacemakerTest {
@Test
public void testServerDeletePath() throws UnsupportedEncodingException {
- Pacemaker handler = new Pacemaker(new ConcurrentHashMap());
makeNode(handler, "/some-root/DELETE_PATH/foo");
makeNode(handler, "/some-root/DELETE_PATH/bar");
makeNode(handler, "/some-root/DELETE_PATH/baz");
@@ -202,7 +197,6 @@ public class PacemakerTest {
@Test
public void testServerDeletePulseId() throws UnsupportedEncodingException {
- Pacemaker handler = new Pacemaker(new ConcurrentHashMap());
makeNode(handler, "/some-root/DELETE_PULSE_ID/foo");
makeNode(handler, "/some-root/DELETE_PULSE_ID/bar");
makeNode(handler, "/some-root/DELETE_PULSE_ID/baz");
http://git-wip-us.apache.org/repos/asf/storm/blob/8c90f12d/storm-server/src/test/java/org/apache/storm/daemon/drpc/DRPCTest.java
----------------------------------------------------------------------
diff --git a/storm-server/src/test/java/org/apache/storm/daemon/drpc/DRPCTest.java b/storm-server/src/test/java/org/apache/storm/daemon/drpc/DRPCTest.java
index e4b586b..a0825d9 100644
--- a/storm-server/src/test/java/org/apache/storm/daemon/drpc/DRPCTest.java
+++ b/storm-server/src/test/java/org/apache/storm/daemon/drpc/DRPCTest.java
@@ -47,6 +47,8 @@ import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
+import org.apache.storm.metric.StormMetricsRegistry;
+
public class DRPCTest {
private static final ExecutorService exec = Executors.newCachedThreadPool();
@@ -80,7 +82,7 @@ public class DRPCTest {
@Test
public void testGoodBlocking() throws Exception {
- try (DRPC server = new DRPC(null, 100)) {
+ try (DRPC server = new DRPC(new StormMetricsRegistry(), null, 100)) {
Future<String> found = exec.submit(() -> server.executeBlocking("testing", "test"));
DRPCRequest request = getNextAvailableRequest(server, "testing");
assertNotNull(request);
@@ -94,7 +96,7 @@ public class DRPCTest {
@Test
public void testFailedBlocking() throws Exception {
- try (DRPC server = new DRPC(null, 100)) {
+ try (DRPC server = new DRPC(new StormMetricsRegistry(), null, 100)) {
Future<String> found = exec.submit(() -> server.executeBlocking("testing", "test"));
DRPCRequest request = getNextAvailableRequest(server, "testing");
assertNotNull(request);
@@ -116,7 +118,7 @@ public class DRPCTest {
@Test
public void testDequeueAfterTimeout() throws Exception {
long timeout = 1000;
- try (DRPC server = new DRPC(null, timeout)) {
+ try (DRPC server = new DRPC(new StormMetricsRegistry(), null, timeout)) {
long start = Time.currentTimeMillis();
try {
server.executeBlocking("testing", "test");
@@ -136,7 +138,7 @@ public class DRPCTest {
@Test
public void testDeny() throws Exception {
- try (DRPC server = new DRPC(new DenyAuthorizer(), 100)) {
+ try (DRPC server = new DRPC(new StormMetricsRegistry(), new DenyAuthorizer(), 100)) {
assertThrows(() -> server.executeBlocking("testing", "test"), AuthorizationException.class);
assertThrows(() -> server.fetchRequest("testing"), AuthorizationException.class);
}
http://git-wip-us.apache.org/repos/asf/storm/blob/8c90f12d/storm-server/src/test/java/org/apache/storm/daemon/supervisor/BasicContainerTest.java
----------------------------------------------------------------------
diff --git a/storm-server/src/test/java/org/apache/storm/daemon/supervisor/BasicContainerTest.java b/storm-server/src/test/java/org/apache/storm/daemon/supervisor/BasicContainerTest.java
index 1094e1c..1c313be 100644
--- a/storm-server/src/test/java/org/apache/storm/daemon/supervisor/BasicContainerTest.java
+++ b/storm-server/src/test/java/org/apache/storm/daemon/supervisor/BasicContainerTest.java
@@ -41,6 +41,7 @@ import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
+import org.apache.storm.metric.StormMetricsRegistry;
import org.apache.storm.utils.ConfigUtils;
public class BasicContainerTest {
@@ -102,8 +103,8 @@ public class BasicContainerTest {
LocalState ls = mock(LocalState.class);
MockBasicContainer mc = new MockBasicContainer(ContainerType.LAUNCH, superConf,
- "SUPERVISOR", supervisorPort, port, la, null, ls, null, new HashMap<>(), ops,
- "profile");
+ "SUPERVISOR", supervisorPort, port, la, null, ls, null, new StormMetricsRegistry(),
+ new HashMap<>(), ops, "profile");
//null worker id means generate one...
assertNotNull(mc._workerId);
@@ -133,8 +134,8 @@ public class BasicContainerTest {
when(ops.doRequiredTopoFilesExist(superConf, topoId)).thenReturn(true);
MockBasicContainer mc = new MockBasicContainer(ContainerType.RECOVER_FULL, superConf,
- "SUPERVISOR", supervisorPort, port, la, null, ls, null, new HashMap<>(), ops,
- "profile");
+ "SUPERVISOR", supervisorPort, port, la, null, ls, null, new StormMetricsRegistry(),
+ new HashMap<>(), ops, "profile");
assertEquals(workerId, mc._workerId);
}
@@ -155,7 +156,8 @@ public class BasicContainerTest {
try {
new MockBasicContainer(ContainerType.RECOVER_FULL, new HashMap<String, Object>(),
- "SUPERVISOR", supervisorPort, port, la, null, ls, null, new HashMap<>(), null, "profile");
+ "SUPERVISOR", supervisorPort, port, la, null, ls, null, new StormMetricsRegistry(),
+ new HashMap<>(), null, "profile");
fail("Container recovered worker incorrectly");
} catch (ContainerRecoveryException e) {
//Expected
@@ -182,7 +184,7 @@ public class BasicContainerTest {
when(ls.getApprovedWorkers()).thenReturn(new HashMap<>(workerState));
MockBasicContainer mc = new MockBasicContainer(ContainerType.LAUNCH, superConf,
- "SUPERVISOR", supervisorPort, port, la, null, ls, workerId, new HashMap<>(), ops,
+ "SUPERVISOR", supervisorPort, port, la, null, ls, workerId, new StormMetricsRegistry(), new HashMap<>(), ops,
"profile");
mc.cleanUp();
@@ -218,8 +220,8 @@ public class BasicContainerTest {
LocalState ls = mock(LocalState.class);
MockBasicContainer mc = new MockBasicContainer(ContainerType.LAUNCH, superConf,
- "SUPERVISOR", supervisorPort, port, la, null, ls, workerId, new HashMap<>(), ops,
- "profile");
+ "SUPERVISOR", supervisorPort, port, la, null, ls, workerId, new StormMetricsRegistry(),
+ new HashMap<>(), ops, "profile");
//HEAP DUMP
ProfileRequest req = new ProfileRequest();
@@ -329,9 +331,8 @@ public class BasicContainerTest {
checkpoint(() -> {
MockBasicContainer mc = new MockBasicContainer(ContainerType.LAUNCH, superConf,
- "SUPERVISOR", supervisorPort, port, la, null, ls, workerId, new
- HashMap<>(), ops,
- "profile");
+ "SUPERVISOR", supervisorPort, port, la, null, ls, workerId, new StormMetricsRegistry(),
+ new HashMap<>(), ops, "profile");
mc.launch();
@@ -432,9 +433,8 @@ public class BasicContainerTest {
checkpoint(() -> {
MockBasicContainer mc = new MockBasicContainer(ContainerType.LAUNCH, superConf,
- "SUPERVISOR", supervisorPort, port, la, null, ls, workerId, new
- HashMap<>(), ops,
- "profile");
+ "SUPERVISOR", supervisorPort, port, la, null, ls, workerId, new StormMetricsRegistry(),
+ new HashMap<>(), ops, "profile");
mc.launch();
@@ -534,9 +534,8 @@ public class BasicContainerTest {
checkpoint(() -> {
MockBasicContainer mc = new MockBasicContainer(ContainerType.LAUNCH, superConf,
- "SUPERVISOR", supervisorPort, port, la, null, ls, workerId, new
- HashMap<>(), ops,
- "profile");
+ "SUPERVISOR", supervisorPort, port, la, null, ls, workerId, new StormMetricsRegistry(),
+ new HashMap<>(), ops, "profile");
mc.launch();
@@ -612,8 +611,8 @@ public class BasicContainerTest {
LocalState ls = mock(LocalState.class);
MockBasicContainer mc = new MockBasicContainer(ContainerType.LAUNCH, superConf,
- "SUPERVISOR", supervisorPort, port, la, null, ls, workerId, new HashMap<>(), ops,
- "profile");
+ "SUPERVISOR", supervisorPort, port, la, null, ls, workerId, new StormMetricsRegistry(),
+ new HashMap<>(), ops, "profile");
assertListEquals(Arrays.asList(
"-Xloggc:/tmp/storm/logs/gc.worker-9999-s-01-w-01-9999.log",
@@ -656,10 +655,10 @@ public class BasicContainerTest {
public final List<CommandRun> workerCmds = new ArrayList<>();
public MockBasicContainer(ContainerType type, Map<String, Object> conf, String supervisorId, int supervisorPort,
int port, LocalAssignment assignment, ResourceIsolationInterface resourceIsolationManager,
- LocalState localState, String workerId, Map<String, Object> topoConf, AdvancedFSOps ops,
- String profileCmd) throws IOException {
+ LocalState localState, String workerId, StormMetricsRegistry metricsRegistry,
+ Map<String, Object> topoConf, AdvancedFSOps ops, String profileCmd) throws IOException {
super(type, conf, supervisorId, supervisorPort, port, assignment, resourceIsolationManager, localState,
- workerId, topoConf, ops, profileCmd);
+ workerId, metricsRegistry,new ContainerMemoryTracker(metricsRegistry), topoConf, ops, profileCmd);
}
@Override
http://git-wip-us.apache.org/repos/asf/storm/blob/8c90f12d/storm-server/src/test/java/org/apache/storm/daemon/supervisor/ContainerTest.java
----------------------------------------------------------------------
diff --git a/storm-server/src/test/java/org/apache/storm/daemon/supervisor/ContainerTest.java b/storm-server/src/test/java/org/apache/storm/daemon/supervisor/ContainerTest.java
index b510838..cf5ec8d 100644
--- a/storm-server/src/test/java/org/apache/storm/daemon/supervisor/ContainerTest.java
+++ b/storm-server/src/test/java/org/apache/storm/daemon/supervisor/ContainerTest.java
@@ -42,6 +42,8 @@ import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
+import org.apache.storm.metric.StormMetricsRegistry;
+
public class ContainerTest {
private static final Joiner PATH_JOIN = Joiner.on(File.separator).skipNulls();
private static final String DOUBLE_SEP = File.separator + File.separator;
@@ -71,7 +73,7 @@ public class ContainerTest {
LocalAssignment la = new LocalAssignment();
la.set_topology_id(topoId);
MockContainer mc = new MockContainer(ContainerType.LAUNCH, superConf,
- "SUPERVISOR", 6628, 8080, la, null, "worker", new HashMap<>(), ops);
+ "SUPERVISOR", 6628, 8080, la, null, "worker", new HashMap<>(), ops, new StormMetricsRegistry());
mc.kill();
assertEquals(Collections.EMPTY_LIST, mc.killedPids);
assertEquals(Collections.EMPTY_LIST, mc.forceKilledPids);
@@ -134,7 +136,7 @@ public class ContainerTest {
la.set_topology_id(topoId);
la.set_owner(user);
MockContainer mc = new MockContainer(ContainerType.LAUNCH, superConf,
- "SUPERVISOR", 6628, 8080, la, null, workerId, topoConf, ops);
+ "SUPERVISOR", 6628, 8080, la, null, workerId, topoConf, ops, new StormMetricsRegistry());
mc.setup();
@@ -205,7 +207,7 @@ public class ContainerTest {
la.set_owner(user);
la.set_topology_id(topoId);
MockContainer mc = new MockContainer(ContainerType.LAUNCH, superConf,
- "SUPERVISOR", supervisorPort, port, la, iso, workerId, topoConf, ops);
+ "SUPERVISOR", supervisorPort, port, la, iso, workerId, topoConf, ops, new StormMetricsRegistry());
mc.allPids.add(pid);
mc.cleanUp();
@@ -226,9 +228,9 @@ public class ContainerTest {
public final Set<Long> allPids = new HashSet<>();
protected MockContainer(ContainerType type, Map<String, Object> conf, String supervisorId, int supervisorPort,
int port, LocalAssignment assignment, ResourceIsolationInterface resourceIsolationManager,
- String workerId, Map<String, Object> topoConf, AdvancedFSOps ops) throws IOException {
+ String workerId, Map<String, Object> topoConf, AdvancedFSOps ops, StormMetricsRegistry metricsRegistry) throws IOException {
super(type, conf, supervisorId, supervisorPort, port, assignment, resourceIsolationManager, workerId,
- topoConf, ops);
+ topoConf, ops, metricsRegistry, new ContainerMemoryTracker(new StormMetricsRegistry()));
}
@Override
http://git-wip-us.apache.org/repos/asf/storm/blob/8c90f12d/storm-server/src/test/java/org/apache/storm/daemon/supervisor/SlotTest.java
----------------------------------------------------------------------
diff --git a/storm-server/src/test/java/org/apache/storm/daemon/supervisor/SlotTest.java b/storm-server/src/test/java/org/apache/storm/daemon/supervisor/SlotTest.java
index 6f09f3f..21a9bc2 100644
--- a/storm-server/src/test/java/org/apache/storm/daemon/supervisor/SlotTest.java
+++ b/storm-server/src/test/java/org/apache/storm/daemon/supervisor/SlotTest.java
@@ -52,6 +52,8 @@ import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.*;
+import org.apache.storm.metric.StormMetricsRegistry;
+
public class SlotTest {
private static final Logger LOG = LoggerFactory.getLogger(SlotTest.class);
@@ -147,9 +149,11 @@ public class SlotTest {
BlobChangingCallback cb = mock(BlobChangingCallback.class);
ContainerLauncher containerLauncher = mock(ContainerLauncher.class);
ISupervisor iSuper = mock(ISupervisor.class);
+ SlotMetrics slotMetrics = new SlotMetrics(new StormMetricsRegistry());
StaticState staticState = new StaticState(localizer, 1000, 1000, 1000, 1000,
- containerLauncher, "localhost", 8080, iSuper, state, cb, null, null);
- DynamicState dynamicState = new DynamicState(null, null, null);
+ containerLauncher, "localhost", 8080, iSuper, state, cb, null, null,
+ slotMetrics);
+ DynamicState dynamicState = new DynamicState(null, null, null, slotMetrics);
DynamicState nextState = Slot.handleEmpty(dynamicState, staticState);
assertEquals(MachineState.EMPTY, nextState.state);
assertTrue(Time.currentTimeMillis() > 1000);
@@ -179,9 +183,10 @@ public class SlotTest {
when(localizer.requestDownloadTopologyBlobs(newAssignment, port, cb)).thenReturn(blobFuture);
ISupervisor iSuper = mock(ISupervisor.class);
+ SlotMetrics slotMetrics = new SlotMetrics(new StormMetricsRegistry());
StaticState staticState = new StaticState(localizer, 5000, 120000, 1000, 1000,
- containerLauncher, "localhost", port, iSuper, state, cb, null, null);
- DynamicState dynamicState = new DynamicState(null, null, null)
+ containerLauncher, "localhost", port, iSuper, state, cb, null, null, slotMetrics);
+ DynamicState dynamicState = new DynamicState(null, null, null, slotMetrics)
.withNewAssignment(newAssignment);
DynamicState nextState = Slot.stateMachineStep(dynamicState, staticState);
@@ -248,9 +253,10 @@ public class SlotTest {
ISupervisor iSuper = mock(ISupervisor.class);
LocalState state = mock(LocalState.class);
+ SlotMetrics slotMetrics = new SlotMetrics(new StormMetricsRegistry());
StaticState staticState = new StaticState(localizer, 5000, 120000, 1000, 1000,
- containerLauncher, "localhost", port, iSuper, state, cb, null, null);
- DynamicState dynamicState = new DynamicState(assignment, container, assignment);
+ containerLauncher, "localhost", port, iSuper, state, cb, null, null, slotMetrics);
+ DynamicState dynamicState = new DynamicState(assignment, container, assignment, slotMetrics);
DynamicState nextState = Slot.stateMachineStep(dynamicState, staticState);
assertEquals(MachineState.KILL_AND_RELAUNCH, nextState.state);
@@ -309,9 +315,10 @@ public class SlotTest {
when(localizer.requestDownloadTopologyBlobs(nAssignment, port, cb)).thenReturn(blobFuture);
ISupervisor iSuper = mock(ISupervisor.class);
+ SlotMetrics slotMetrics = new SlotMetrics(new StormMetricsRegistry());
StaticState staticState = new StaticState(localizer, 5000, 120000, 1000, 1000,
- containerLauncher, "localhost", port, iSuper, state, cb, null, null);
- DynamicState dynamicState = new DynamicState(cAssignment, cContainer, nAssignment);
+ containerLauncher, "localhost", port, iSuper, state, cb, null, null, slotMetrics);
+ DynamicState dynamicState = new DynamicState(cAssignment, cContainer, nAssignment, slotMetrics);
DynamicState nextState = Slot.stateMachineStep(dynamicState, staticState);
assertEquals(MachineState.KILL, nextState.state);
@@ -390,9 +397,10 @@ public class SlotTest {
ISupervisor iSuper = mock(ISupervisor.class);
LocalState state = mock(LocalState.class);
+ SlotMetrics slotMetrics = new SlotMetrics(new StormMetricsRegistry());
StaticState staticState = new StaticState(localizer, 5000, 120000, 1000, 1000,
- containerLauncher, "localhost", port, iSuper, state, cb, null, null);
- DynamicState dynamicState = new DynamicState(cAssignment, cContainer, null);
+ containerLauncher, "localhost", port, iSuper, state, cb, null, null, slotMetrics);
+ DynamicState dynamicState = new DynamicState(cAssignment, cContainer, null, slotMetrics);
DynamicState nextState = Slot.stateMachineStep(dynamicState, staticState);
assertEquals(MachineState.KILL, nextState.state);
@@ -452,7 +460,7 @@ public class SlotTest {
ISupervisor iSuper = mock(ISupervisor.class);
LocalState state = mock(LocalState.class);
StaticState staticState = new StaticState(localizer, 5000, 120000, 1000, 1000,
- containerLauncher, "localhost", port, iSuper, state, cb, null, null);
+ containerLauncher, "localhost", port, iSuper, state, cb, null, null, new SlotMetrics(new StormMetricsRegistry()));
Set<TopoProfileAction> profileActions = new HashSet<>();
ProfileRequest request = new ProfileRequest();
request.set_action(ProfileAction.JPROFILE_STOP);
@@ -467,8 +475,8 @@ public class SlotTest {
Set<TopoProfileAction> expectedPending = new HashSet<>();
expectedPending.add(profile);
-
- DynamicState dynamicState = new DynamicState(cAssignment, cContainer, cAssignment)
+ SlotMetrics slotMetrics = new SlotMetrics(new StormMetricsRegistry());
+ DynamicState dynamicState = new DynamicState(cAssignment, cContainer, cAssignment, slotMetrics)
.withProfileActions(profileActions, Collections.<TopoProfileAction>emptySet());
DynamicState nextState = Slot.stateMachineStep(dynamicState, staticState);
@@ -534,7 +542,7 @@ public class SlotTest {
ISupervisor iSuper = mock(ISupervisor.class);
long heartbeatTimeoutMs = 5000;
StaticState staticState = new StaticState(localizer, heartbeatTimeoutMs, 120_000, 1000, 1000,
- containerLauncher, "localhost", port, iSuper, state, cb, null, null);
+ containerLauncher, "localhost", port, iSuper, state, cb, null, null, new SlotMetrics(new StormMetricsRegistry()));
Set<Slot.BlobChanging> changing = new HashSet<>();
@@ -549,7 +557,8 @@ public class SlotTest {
GoodToGo.GoodToGoLatch otherJarLatch = mock(GoodToGo.GoodToGoLatch.class);
changing.add(new Slot.BlobChanging(otherAssignment, otherJar, otherJarLatch));
- DynamicState dynamicState = new DynamicState(cAssignment, cContainer, cAssignment).withChangingBlobs(changing);
+ SlotMetrics slotMetrics = new SlotMetrics(new StormMetricsRegistry());
+ DynamicState dynamicState = new DynamicState(cAssignment, cContainer, cAssignment, slotMetrics).withChangingBlobs(changing);
DynamicState nextState = Slot.stateMachineStep(dynamicState, staticState);
assertEquals(MachineState.KILL_BLOB_UPDATE, nextState.state);
http://git-wip-us.apache.org/repos/asf/storm/blob/8c90f12d/storm-server/src/test/java/org/apache/storm/localizer/AsyncLocalizerTest.java
----------------------------------------------------------------------
diff --git a/storm-server/src/test/java/org/apache/storm/localizer/AsyncLocalizerTest.java b/storm-server/src/test/java/org/apache/storm/localizer/AsyncLocalizerTest.java
index 8654d25..243499a 100644
--- a/storm-server/src/test/java/org/apache/storm/localizer/AsyncLocalizerTest.java
+++ b/storm-server/src/test/java/org/apache/storm/localizer/AsyncLocalizerTest.java
@@ -79,6 +79,8 @@ import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
+import org.apache.storm.metric.StormMetricsRegistry;
+
public class AsyncLocalizerTest {
private static final Logger LOG = LoggerFactory.getLogger(AsyncLocalizerTest.class);
private final String user1 = "user1";
@@ -117,7 +119,7 @@ public class AsyncLocalizerTest {
ReflectionUtils mockedRU = mock(ReflectionUtils.class);
ServerUtils mockedU = mock(ServerUtils.class);
- AsyncLocalizer bl = spy(new AsyncLocalizer(conf, ops, getTestLocalizerRoot()));
+ AsyncLocalizer bl = spy(new AsyncLocalizer(conf, ops, getTestLocalizerRoot(), new StormMetricsRegistry()));
LocallyCachedTopologyBlob jarBlob = mock(LocallyCachedTopologyBlob.class);
doReturn(jarBlob).when(bl).getTopoJar(topoId, la.get_owner());
when(jarBlob.getLocalVersion()).thenReturn(-1L);
@@ -213,10 +215,11 @@ public class AsyncLocalizerTest {
topoConf.put(Config.TOPOLOGY_NAME, topoName);
List<LocalizedResource> localizedList = new ArrayList<>();
- LocalizedResource simpleLocal = new LocalizedResource(simpleKey, Paths.get(localizerRoot), false, ops, conf, user);
+ StormMetricsRegistry metricsRegistry = new StormMetricsRegistry();
+ LocalizedResource simpleLocal = new LocalizedResource(simpleKey, Paths.get(localizerRoot), false, ops, conf, user, metricsRegistry);
localizedList.add(simpleLocal);
- AsyncLocalizer bl = spy(new AsyncLocalizer(conf, ops, localizerRoot));
+ AsyncLocalizer bl = spy(new AsyncLocalizer(conf, ops, localizerRoot, metricsRegistry));
ConfigUtils orig = ConfigUtils.setInstance(mockedCU);
try {
when(mockedCU.supervisorStormDistRootImpl(conf, topoId)).thenReturn(stormRoot);
@@ -889,7 +892,7 @@ public class AsyncLocalizerTest {
class TestLocalizer extends AsyncLocalizer {
TestLocalizer(Map<String, Object> conf, String baseDir) throws IOException {
- super(conf, AdvancedFSOps.make(conf), baseDir);
+ super(conf, AdvancedFSOps.make(conf), baseDir, new StormMetricsRegistry());
}
@Override
http://git-wip-us.apache.org/repos/asf/storm/blob/8c90f12d/storm-server/src/test/java/org/apache/storm/localizer/LocalizedResourceRetentionSetTest.java
----------------------------------------------------------------------
diff --git a/storm-server/src/test/java/org/apache/storm/localizer/LocalizedResourceRetentionSetTest.java b/storm-server/src/test/java/org/apache/storm/localizer/LocalizedResourceRetentionSetTest.java
index f41c3df..db67e04 100644
--- a/storm-server/src/test/java/org/apache/storm/localizer/LocalizedResourceRetentionSetTest.java
+++ b/storm-server/src/test/java/org/apache/storm/localizer/LocalizedResourceRetentionSetTest.java
@@ -32,6 +32,8 @@ import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;
+import org.apache.storm.metric.StormMetricsRegistry;
+
public class LocalizedResourceRetentionSetTest {
@Test
@@ -43,9 +45,10 @@ public class LocalizedResourceRetentionSetTest {
IAdvancedFSOps ops = mock(IAdvancedFSOps.class);
LocalizedResourceRetentionSet lrretset = new LocalizedResourceRetentionSet(10);
ConcurrentMap<String, LocalizedResource> lrset = new ConcurrentHashMap<>();
- LocalizedResource localresource1 = new LocalizedResource("key1", Paths.get("testfile1"), false, ops, conf, user);
+ StormMetricsRegistry metricsRegistry = new StormMetricsRegistry();
+ LocalizedResource localresource1 = new LocalizedResource("key1", Paths.get("testfile1"), false, ops, conf, user, metricsRegistry);
localresource1.addReference(pna1, null);
- LocalizedResource localresource2 = new LocalizedResource("key2", Paths.get("testfile2"), false, ops, conf, user);
+ LocalizedResource localresource2 = new LocalizedResource("key2", Paths.get("testfile2"), false, ops, conf, user, metricsRegistry);
localresource2.addReference(pna1, null);
// check adding reference to local resource with topology of same name
localresource2.addReference(pna2, null);
@@ -81,17 +84,18 @@ public class LocalizedResourceRetentionSetTest {
LocalizedResourceRetentionSet lrretset = spy(new LocalizedResourceRetentionSet(10));
ConcurrentMap<String, LocalizedResource> lrFiles = new ConcurrentHashMap<>();
ConcurrentMap<String, LocalizedResource> lrArchives = new ConcurrentHashMap<>();
+ StormMetricsRegistry metricsRegistry = new StormMetricsRegistry();
// no reference to key1
LocalizedResource localresource1 = new LocalizedResource("key1", Paths.get("./target/TESTING/testfile1"), false, ops, conf,
- user);
+ user, metricsRegistry);
localresource1.setSize(10);
// no reference to archive1
LocalizedResource archiveresource1 = new LocalizedResource("archive1", Paths.get("./target/TESTING/testarchive1"), true, ops,
- conf, user);
+ conf, user, metricsRegistry);
archiveresource1.setSize(20);
// reference to key2
LocalizedResource localresource2 = new LocalizedResource("key2", Paths.get("./target/TESTING/testfile2"), false, ops, conf,
- user);
+ user, metricsRegistry);
localresource2.addReference(pna1, null);
// check adding reference to local resource with topology of same name
localresource2.addReference(pna1, null);
http://git-wip-us.apache.org/repos/asf/storm/blob/8c90f12d/storm-server/src/test/java/org/apache/storm/metric/StormMetricsRegistryTest.java
----------------------------------------------------------------------
diff --git a/storm-server/src/test/java/org/apache/storm/metric/StormMetricsRegistryTest.java b/storm-server/src/test/java/org/apache/storm/metric/StormMetricsRegistryTest.java
deleted file mode 100644
index 5d9b3e4..0000000
--- a/storm-server/src/test/java/org/apache/storm/metric/StormMetricsRegistryTest.java
+++ /dev/null
@@ -1,111 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.storm.metric;
-
-import com.codahale.metrics.Meter;
-import com.codahale.metrics.Metric;
-import com.codahale.metrics.MetricSet;
-import com.codahale.metrics.Timer;
-import org.junit.jupiter.api.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import static com.codahale.metrics.MetricRegistry.name;
-import static org.junit.jupiter.api.Assertions.*;
-
-class StormMetricsRegistryTest {
- private static final Logger LOG = LoggerFactory.getLogger(StormMetricsRegistryTest.class);
-
- private static final String OUTER_METER = "outerMeter";
- private static final String INNER_SET = "innerSet";
- private static final String OUTER_TIMER = "outerTimer";
- private static final String INNER_METER = "innerMeter";
- private static final String INNER_TIMER = "innerTimer";
- private static final MetricSet OUTER = newMetricSetInstance();
-
- @Test
- void registerMetricSet() {
- Meter existingInnerMeter = StormMetricsRegistry.registerMeter(name(INNER_SET, INNER_METER));
-
- LOG.info("register outer set");
- StormMetricsRegistry.registerMetricSet(OUTER);
- assertSame(OUTER.getMetrics().get(OUTER_TIMER), StormMetricsRegistry.REGISTRY.getMetrics().get(OUTER_TIMER));
- assertSame(OUTER.getMetrics().get(OUTER_METER), StormMetricsRegistry.REGISTRY.getMetrics().get(OUTER_METER));
- assertSame(((MetricSet) OUTER.getMetrics().get(INNER_SET)).getMetrics().get(INNER_TIMER),
- StormMetricsRegistry.REGISTRY.getMetrics().get(name(INNER_SET, INNER_TIMER)));
-
- assertNotSame(((MetricSet) OUTER.getMetrics().get(INNER_SET)).getMetrics().get(INNER_METER),
- StormMetricsRegistry.REGISTRY.getMetrics().get(name(INNER_SET, INNER_METER)));
- assertSame(existingInnerMeter, StormMetricsRegistry.REGISTRY.getMetrics().get(name(INNER_SET, INNER_METER)));
-
- //Ensure idempotency
- LOG.info("twice register outer set");
- MetricSet newOuter = newMetricSetInstance();
- StormMetricsRegistry.registerMetricSet(newOuter);
- assertSame(OUTER.getMetrics().get(OUTER_TIMER), StormMetricsRegistry.REGISTRY.getMetrics().get(OUTER_TIMER));
- assertSame(OUTER.getMetrics().get(OUTER_METER), StormMetricsRegistry.REGISTRY.getMetrics().get(OUTER_METER));
- assertSame(((MetricSet) OUTER.getMetrics().get(INNER_SET)).getMetrics().get(INNER_TIMER),
- StormMetricsRegistry.REGISTRY.getMetrics().get(name(INNER_SET, INNER_TIMER)));
- assertSame(existingInnerMeter, StormMetricsRegistry.REGISTRY.getMetrics().get(name(INNER_SET, INNER_METER)));
-
- LOG.info("name collision");
- assertThrows(IllegalArgumentException.class, () -> StormMetricsRegistry.registerGauge(name(INNER_SET, INNER_METER), () -> 0));
- }
-
- @Test
- void unregisterMetricSet() {
- StormMetricsRegistry.registerMetricSet(OUTER);
- StormMetricsRegistry.unregisterMetricSet(OUTER);
- assertTrue(StormMetricsRegistry.REGISTRY.getMetrics().isEmpty());
-
- }
-
- private static MetricSet newMetricSetInstance() {
- return new MetricSet() {
- private final MetricSet inner = new MetricSet() {
- private final Map<String, Metric> map = new HashMap<>();
-
- {
- map.put(INNER_METER, new Meter());
- map.put(INNER_TIMER, new Timer());
- }
-
- @Override
- public Map<String, Metric> getMetrics() {
- return map;
- }
- };
- private final Map<String, Metric> outerMap = new HashMap<>();
-
- {
- outerMap.put(OUTER_METER, new Meter());
- outerMap.put(INNER_SET, inner);
- outerMap.put(OUTER_TIMER, new Timer());
- }
-
- @Override
- public Map<String, Metric> getMetrics() {
- return outerMap;
- }
- };
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/storm/blob/8c90f12d/storm-server/src/test/java/org/apache/storm/metricstore/rocksdb/RocksDbStoreTest.java
----------------------------------------------------------------------
diff --git a/storm-server/src/test/java/org/apache/storm/metricstore/rocksdb/RocksDbStoreTest.java b/storm-server/src/test/java/org/apache/storm/metricstore/rocksdb/RocksDbStoreTest.java
index 63df80a..9609c3a 100644
--- a/storm-server/src/test/java/org/apache/storm/metricstore/rocksdb/RocksDbStoreTest.java
+++ b/storm-server/src/test/java/org/apache/storm/metricstore/rocksdb/RocksDbStoreTest.java
@@ -40,6 +40,7 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import org.apache.storm.metric.StormMetricsRegistry;
public class RocksDbStoreTest {
private final static Logger LOG = LoggerFactory.getLogger(RocksDbStoreTest.class);
@@ -57,7 +58,7 @@ public class RocksDbStoreTest {
conf.put(DaemonConfig.STORM_ROCKSDB_CREATE_IF_MISSING, true);
conf.put(DaemonConfig.STORM_ROCKSDB_METADATA_STRING_CACHE_CAPACITY, 4000);
conf.put(DaemonConfig.STORM_ROCKSDB_METRIC_RETENTION_HOURS, 240);
- store = MetricStoreConfig.configure(conf);
+ store = MetricStoreConfig.configure(conf, new StormMetricsRegistry());
}
@AfterClass
@@ -307,7 +308,7 @@ public class RocksDbStoreTest {
Assert.assertTrue(list.size() >= 2);
// delete anything older than an hour
- MetricsCleaner cleaner = new MetricsCleaner((RocksDbStore)store, 1, 1, null);
+ MetricsCleaner cleaner = new MetricsCleaner((RocksDbStore)store, 1, 1, null, new StormMetricsRegistry());
cleaner.purgeMetrics();
list = getMetricsFromScan(filter);
Assert.assertEquals(1, list.size());
http://git-wip-us.apache.org/repos/asf/storm/blob/8c90f12d/storm-server/src/test/java/org/apache/storm/scheduler/blacklist/FaultGenerateUtils.java
----------------------------------------------------------------------
diff --git a/storm-server/src/test/java/org/apache/storm/scheduler/blacklist/FaultGenerateUtils.java b/storm-server/src/test/java/org/apache/storm/scheduler/blacklist/FaultGenerateUtils.java
index daf8671..9a34080 100644
--- a/storm-server/src/test/java/org/apache/storm/scheduler/blacklist/FaultGenerateUtils.java
+++ b/storm-server/src/test/java/org/apache/storm/scheduler/blacklist/FaultGenerateUtils.java
@@ -27,6 +27,8 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import org.apache.storm.metric.StormMetricsRegistry;
+import org.apache.storm.scheduler.resource.normalization.ResourceMetrics;
public class FaultGenerateUtils {
@@ -58,6 +60,6 @@ public class FaultGenerateUtils {
} else {
assignment = TestUtilsForBlacklistScheduler.assignmentMapToImpl(cluster.getAssignments());
}
- return new Cluster(iNimbus, supervisors, assignment, topologies, config);
+ return new Cluster(iNimbus, new ResourceMetrics(new StormMetricsRegistry()), supervisors, assignment, topologies, config);
}
}