You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ag...@apache.org on 2020/03/10 17:35:30 UTC
[ignite] branch master updated: IGNITE-12756 TcpCommunication SPI
metrics improvement
This is an automated email from the ASF dual-hosted git repository.
agura pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 683f22e IGNITE-12756 TcpCommunication SPI metrics improvement
683f22e is described below
commit 683f22e64fe57b2de47cfe36cf40b237ce7aaf8e
Author: Andrey Gura <ag...@apache.org>
AuthorDate: Tue Feb 25 19:55:01 2020 +0300
IGNITE-12756 TcpCommunication SPI metrics improvement
---
.../internal/managers/GridManagerAdapter.java | 18 +++
.../managers/communication/GridIoManager.java | 94 ++++++-------
.../communication/IgniteMessageFactoryImpl.java | 37 ++++-
.../processors/resource/GridResourceIoc.java | 19 +--
.../processors/resource/GridResourceProcessor.java | 9 +-
.../internal/resources/MetricManagerResource.java | 32 -----
.../org/apache/ignite/spi/IgniteSpiAdapter.java | 33 ++++-
.../org/apache/ignite/spi/IgniteSpiContext.java | 36 +++++
.../tcp/TcpCommunicationMetricsListener.java | 156 ++++++++++++---------
.../spi/communication/tcp/TcpCommunicationSpi.java | 29 ++--
.../IgniteMessageFactoryImplTest.java | 47 ++++++-
.../ignite/testframework/GridSpiTestContext.java | 30 +++-
.../testframework/junits/IgniteTestResources.java | 16 +--
13 files changed, 357 insertions(+), 199 deletions(-)
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java
index d2952d9..d10d774 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java
@@ -23,6 +23,7 @@ import java.util.Collections;
import java.util.IdentityHashMap;
import java.util.Map;
import java.util.UUID;
+import java.util.function.Consumer;
import javax.cache.expiry.Duration;
import javax.cache.expiry.ExpiryPolicy;
import javax.cache.expiry.TouchedExpiryPolicy;
@@ -62,6 +63,7 @@ import org.apache.ignite.spi.IgniteSpiTimeoutObject;
import org.apache.ignite.spi.discovery.DiscoveryDataBag;
import org.apache.ignite.spi.discovery.DiscoveryDataBag.GridDiscoveryData;
import org.apache.ignite.spi.discovery.DiscoveryDataBag.JoiningNodeDiscoveryData;
+import org.apache.ignite.spi.metric.ReadOnlyMetricRegistry;
import org.jetbrains.annotations.Nullable;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
@@ -624,6 +626,22 @@ public abstract class GridManagerAdapter<T extends IgniteSpi> implements GridMan
ctx.discovery().resolveCommunicationError(node, err);
}
+ @Override public ReadOnlyMetricRegistry getOrCreateMetricRegistry(String name) {
+ return ctx.metric().registry(name);
+ }
+
+ @Override public void removeMetricRegistry(String name) {
+ ctx.metric().remove(name);
+ }
+
+ @Override public Iterable<ReadOnlyMetricRegistry> metricRegistries() {
+ return ctx.metric();
+ }
+
+ @Override public void addMetricRegistryCreationListener(Consumer<ReadOnlyMetricRegistry> lsnr) {
+ ctx.metric().addMetricRegistryCreationListener(lsnr);
+ }
+
/**
* @param e Exception to handle.
* @return GridSpiException Converted exception.
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
index b40a166..4ae64b5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
@@ -416,53 +416,6 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
/** {@inheritDoc} */
@Override public void start() throws IgniteCheckedException {
- startSpi();
-
- MetricRegistry ioMetric = ctx.metric().registry(COMM_METRICS);
-
- CommunicationSpi spi = ctx.config().getCommunicationSpi();
-
- ioMetric.register(OUTBOUND_MSG_QUEUE_CNT, spi::getOutboundMessagesQueueSize,
- "Outbound messages queue size.");
-
- ioMetric.register(SENT_MSG_CNT, spi::getSentMessagesCount, "Sent messages count.");
-
- ioMetric.register(SENT_BYTES_CNT, spi::getSentBytesCount, "Sent bytes count.");
-
- ioMetric.register(RCVD_MSGS_CNT, spi::getReceivedMessagesCount,
- "Received messages count.");
-
- ioMetric.register(RCVD_BYTES_CNT, spi::getReceivedBytesCount, "Received bytes count.");
-
- getSpi().setListener(commLsnr = new CommunicationListenerEx<Serializable>() {
- @Override public void onMessage(UUID nodeId, Serializable msg, IgniteRunnable msgC) {
- try {
- onMessage0(nodeId, (GridIoMessage)msg, msgC);
- }
- catch (ClassCastException ignored) {
- U.error(log, "Communication manager received message of unknown type (will ignore): " +
- msg.getClass().getName() + ". Most likely GridCommunicationSpi is being used directly, " +
- "which is illegal - make sure to send messages only via GridProjection API.");
- }
- }
-
- @Override public void onDisconnected(UUID nodeId) {
- for (GridDisconnectListener lsnr : disconnectLsnrs)
- lsnr.onNodeDisconnected(nodeId);
- }
-
- @Override public void onChannelOpened(UUID rmtNodeId, Serializable initMsg, Channel channel) {
- try {
- onChannelOpened0(rmtNodeId, (GridIoMessage)initMsg, channel);
- }
- catch (ClassCastException ignored) {
- U.error(log, "Communication manager received message of unknown type (will ignore): " +
- initMsg.getClass().getName() + ". Most likely GridCommunicationSpi is being used directly, " +
- "which is illegal - make sure to send messages only via GridProjection API.");
- }
- }
- });
-
ctx.addNodeAttribute(DIRECT_PROTO_VER_ATTR, DIRECT_PROTO_VER);
MessageFormatter[] formatterExt = ctx.plugins().extensions(MessageFormatter.class);
@@ -512,6 +465,53 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
msgFactory = new IgniteMessageFactoryImpl(msgs);
+ startSpi();
+
+ MetricRegistry ioMetric = ctx.metric().registry(COMM_METRICS);
+
+ CommunicationSpi spi = ctx.config().getCommunicationSpi();
+
+ ioMetric.register(OUTBOUND_MSG_QUEUE_CNT, spi::getOutboundMessagesQueueSize,
+ "Outbound messages queue size.");
+
+ ioMetric.register(SENT_MSG_CNT, spi::getSentMessagesCount, "Sent messages count.");
+
+ ioMetric.register(SENT_BYTES_CNT, spi::getSentBytesCount, "Sent bytes count.");
+
+ ioMetric.register(RCVD_MSGS_CNT, spi::getReceivedMessagesCount,
+ "Received messages count.");
+
+ ioMetric.register(RCVD_BYTES_CNT, spi::getReceivedBytesCount, "Received bytes count.");
+
+ getSpi().setListener(commLsnr = new CommunicationListenerEx<Serializable>() {
+ @Override public void onMessage(UUID nodeId, Serializable msg, IgniteRunnable msgC) {
+ try {
+ onMessage0(nodeId, (GridIoMessage)msg, msgC);
+ }
+ catch (ClassCastException ignored) {
+ U.error(log, "Communication manager received message of unknown type (will ignore): " +
+ msg.getClass().getName() + ". Most likely GridCommunicationSpi is being used directly, " +
+ "which is illegal - make sure to send messages only via GridProjection API.");
+ }
+ }
+
+ @Override public void onDisconnected(UUID nodeId) {
+ for (GridDisconnectListener lsnr : disconnectLsnrs)
+ lsnr.onNodeDisconnected(nodeId);
+ }
+
+ @Override public void onChannelOpened(UUID rmtNodeId, Serializable initMsg, Channel channel) {
+ try {
+ onChannelOpened0(rmtNodeId, (GridIoMessage)initMsg, channel);
+ }
+ catch (ClassCastException ignored) {
+ U.error(log, "Communication manager received message of unknown type (will ignore): " +
+ initMsg.getClass().getName() + ". Most likely GridCommunicationSpi is being used directly, " +
+ "which is illegal - make sure to send messages only via GridProjection API.");
+ }
+ }
+ });
+
if (log.isDebugEnabled())
log.debug(startInfo());
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/IgniteMessageFactoryImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/IgniteMessageFactoryImpl.java
index 957ef7c..68ce797 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/IgniteMessageFactoryImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/IgniteMessageFactoryImpl.java
@@ -45,6 +45,15 @@ public class IgniteMessageFactoryImpl implements IgniteMessageFactory {
/** Initialized flag. If {@code true} then new message type couldn't be registered. */
private boolean initialized;
+ /** Min index of registered message supplier. */
+ private int minIdx = Integer.MAX_VALUE;
+
+ /** Max index of registered message supplier. */
+ private int maxIdx = -1;
+
+ /** Count of registered message suppliers. */
+ private int cnt;
+
/**
* Contructor.
*
@@ -97,15 +106,21 @@ public class IgniteMessageFactoryImpl implements IgniteMessageFactory {
Supplier<Message> curr = msgSuppliers[idx];
- if (curr == null)
+ if (curr == null) {
msgSuppliers[idx] = supplier;
+
+ minIdx = Math.min(idx, minIdx);
+
+ maxIdx = Math.max(idx, maxIdx);
+
+ cnt++;
+ }
else
throw new IgniteException("Message factory is already registered for direct type: " + directType);
}
/**
* Creates new message instance of provided direct type.
- * <p>
*
* @param directType Message direct type.
* @return Message instance.
@@ -121,6 +136,24 @@ public class IgniteMessageFactoryImpl implements IgniteMessageFactory {
}
/**
+ * Returns direct types of all registered messages.
+ *
+ * @return Direct types of all registered messages.
+ */
+ public short[] registeredDirectTypes() {
+ short[] res = new short[cnt];
+
+ if (cnt > 0) {
+ for (int i = minIdx, p = 0; i <= maxIdx; i++) {
+ if (msgSuppliers[i] != null)
+ res[p++] = indexToDirectType(i);
+ }
+ }
+
+ return res;
+ }
+
+ /**
* @param directType Direct type.
*/
private static int directTypeToIndex(short directType) {
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/resource/GridResourceIoc.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/resource/GridResourceIoc.java
index 10af241..af6a75f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/resource/GridResourceIoc.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/resource/GridResourceIoc.java
@@ -30,7 +30,6 @@ import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.internal.managers.deployment.GridDeployment;
-import org.apache.ignite.internal.resources.MetricManagerResource;
import org.apache.ignite.internal.util.GridLeanIdentitySet;
import org.apache.ignite.internal.util.IgniteUtils;
import org.apache.ignite.internal.util.typedef.F;
@@ -133,7 +132,7 @@ public class GridResourceIoc {
break;
if (dep != null) {
- Set<Class<?>> classes = F.addIfAbsent(taskMap, dep.classLoader(), F.<Class<?>>newCSet());
+ Set<Class<?>> classes = F.addIfAbsent(taskMap, dep.classLoader(), F.newCSet());
classes.add(cls);
@@ -266,7 +265,7 @@ public class GridResourceIoc {
boolean allowImplicitInjection = !GridNoImplicitInjection.class.isAssignableFrom(cls);
- for (Class cls0 = cls; !cls0.equals(Object.class); cls0 = cls0.getSuperclass()) {
+ for (Class<?> cls0 = cls; !cls0.equals(Object.class); cls0 = cls0.getSuperclass()) {
for (Field field : cls0.getDeclaredFields()) {
Annotation[] fieldAnns = field.getAnnotations();
@@ -274,9 +273,7 @@ public class GridResourceIoc {
T2<List<GridResourceField>, List<GridResourceMethod>> t2 = annMap.get(ann.annotationType());
if (t2 == null) {
- t2 = new T2<List<GridResourceField>, List<GridResourceMethod>>(
- new ArrayList<GridResourceField>(),
- new ArrayList<GridResourceMethod>());
+ t2 = new T2<>(new ArrayList<>(), new ArrayList<>());
annMap.put(ann.annotationType(), t2);
}
@@ -299,9 +296,7 @@ public class GridResourceIoc {
T2<List<GridResourceField>, List<GridResourceMethod>> t2 = annMap.get(ann.annotationType());
if (t2 == null) {
- t2 = new T2<List<GridResourceField>, List<GridResourceMethod>>(
- new ArrayList<GridResourceField>(),
- new ArrayList<GridResourceMethod>());
+ t2 = new T2<>(new ArrayList<>(), new ArrayList<>());
annMap.put(ann.annotationType(), t2);
}
@@ -515,9 +510,6 @@ public class GridResourceIoc {
CACHE_STORE_SESSION(CacheStoreSessionResource.class),
/** */
- METRIC_MANAGER(MetricManagerResource.class),
-
- /** */
FILESYSTEM_RESOURCE(FileSystemResource.class);
/** */
@@ -541,8 +533,7 @@ public class GridResourceIoc {
ResourceAnnotation.SPRING,
ResourceAnnotation.IGNITE_INSTANCE,
ResourceAnnotation.LOGGER,
- ResourceAnnotation.SERVICE,
- ResourceAnnotation.METRIC_MANAGER
+ ResourceAnnotation.SERVICE
),
/** */
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/resource/GridResourceProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/resource/GridResourceProcessor.java
index 07cd319..fed25c6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/resource/GridResourceProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/resource/GridResourceProcessor.java
@@ -74,8 +74,6 @@ public class GridResourceProcessor extends GridProcessorAdapter {
new GridResourceLoggerInjector(ctx.config().getGridLogger());
injectorByAnnotation[GridResourceIoc.ResourceAnnotation.IGNITE_INSTANCE.ordinal()] =
new GridResourceBasicInjector<>(ctx.grid());
- injectorByAnnotation[GridResourceIoc.ResourceAnnotation.METRIC_MANAGER.ordinal()] =
- new GridResourceSupplierInjector<>(ctx::metric);
}
/** {@inheritDoc} */
@@ -336,6 +334,7 @@ public class GridResourceProcessor extends GridProcessorAdapter {
case LOAD_BALANCER:
case TASK_CONTINUOUS_MAPPER:
case CACHE_STORE_SESSION:
+ case FILESYSTEM_RESOURCE:
res = new GridResourceBasicInjector<>(param);
break;
@@ -343,10 +342,6 @@ public class GridResourceProcessor extends GridProcessorAdapter {
res = new GridResourceJobContextInjector((ComputeJobContext)param);
break;
- case FILESYSTEM_RESOURCE:
- res = new GridResourceBasicInjector<>(param);
- break;
-
default:
res = injectorByAnnotation[ann.ordinal()];
break;
@@ -411,7 +406,7 @@ public class GridResourceProcessor extends GridProcessorAdapter {
injectToJob(dep, taskCls, obj, ses, jobCtx);
if (obj instanceof GridInternalWrapper) {
- Object usrObj = ((GridInternalWrapper)obj).userObject();
+ Object usrObj = ((GridInternalWrapper<?>)obj).userObject();
if (usrObj != null)
injectToJob(dep, taskCls, usrObj, ses, jobCtx);
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/resources/MetricManagerResource.java b/modules/core/src/main/java/org/apache/ignite/internal/resources/MetricManagerResource.java
deleted file mode 100644
index 21cddd4..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/resources/MetricManagerResource.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.resources;
-
-import java.lang.annotation.Documented;
-import java.lang.annotation.ElementType;
-import java.lang.annotation.Retention;
-import java.lang.annotation.RetentionPolicy;
-import java.lang.annotation.Target;
-
-/** */
-@Documented
-@Retention(RetentionPolicy.RUNTIME)
-@Target({ElementType.METHOD, ElementType.FIELD})
-public @interface MetricManagerResource {
- // No-op.
-}
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java
index f378012..9fa83c0 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java
@@ -24,6 +24,7 @@ import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Consumer;
import javax.management.JMException;
import javax.management.MBeanServer;
import javax.management.ObjectName;
@@ -34,6 +35,7 @@ import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.events.DiscoveryEvent;
import org.apache.ignite.events.Event;
+import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.IgniteKernal;
import org.apache.ignite.internal.IgniteNodeAttributes;
import org.apache.ignite.internal.managers.communication.GridMessageListener;
@@ -54,9 +56,11 @@ import org.apache.ignite.plugin.security.SecuritySubject;
import org.apache.ignite.resources.IgniteInstanceResource;
import org.apache.ignite.resources.LoggerResource;
import org.apache.ignite.spi.discovery.DiscoveryDataBag;
+import org.apache.ignite.spi.metric.ReadOnlyMetricRegistry;
import org.jetbrains.annotations.Nullable;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_SKIP_CONFIGURATION_CONSISTENCY_CHECK;
+import static org.apache.ignite.configuration.IgniteConfiguration.DFLT_FAILURE_DETECTION_TIMEOUT;
import static org.apache.ignite.events.EventType.EVT_NODE_JOINED;
/**
@@ -664,8 +668,7 @@ public abstract class IgniteSpiAdapter implements IgniteSpi {
"'IgniteConfiguration.metricsUpdateFrequency' to prevent unnecessary status checking.");
}
// Intentionally compare references using '!=' below
- else if (ignite.configuration().getFailureDetectionTimeout() !=
- IgniteConfiguration.DFLT_FAILURE_DETECTION_TIMEOUT)
+ else if (ignite.configuration().getFailureDetectionTimeout() != DFLT_FAILURE_DETECTION_TIMEOUT)
log.warning("Failure detection timeout will be ignored (one of SPI parameters has been set explicitly)");
clientFailureDetectionTimeout = ignite.configuration().getClientFailureDetectionTimeout();
@@ -841,7 +844,7 @@ public abstract class IgniteSpiAdapter implements IgniteSpi {
/** {@inheritDoc} */
@Override public Collection<ClusterNode> nodes() {
- return locNode == null ? Collections.<ClusterNode>emptyList() : Collections.singletonList(locNode);
+ return locNode == null ? Collections.emptyList() : Collections.singletonList(locNode);
}
/** {@inheritDoc} */
@@ -946,7 +949,7 @@ public abstract class IgniteSpiAdapter implements IgniteSpi {
if (!(ignite0 instanceof IgniteKernal))
throw new IgniteSpiException("Wrong Ignite instance is set: " + ignite0);
- ((IgniteKernal)ignite0).context().timeout().addTimeoutObject(new GridSpiTimeoutObject(obj));
+ ((IgniteEx)ignite0).context().timeout().addTimeoutObject(new GridSpiTimeoutObject(obj));
}
/** {@inheritDoc} */
@@ -956,7 +959,7 @@ public abstract class IgniteSpiAdapter implements IgniteSpi {
if (!(ignite0 instanceof IgniteKernal))
throw new IgniteSpiException("Wrong Ignite instance is set: " + ignite0);
- ((IgniteKernal)ignite0).context().timeout().removeTimeoutObject(new GridSpiTimeoutObject(obj));
+ ((IgniteEx)ignite0).context().timeout().removeTimeoutObject(new GridSpiTimeoutObject(obj));
}
/** {@inheritDoc} */
@@ -973,5 +976,25 @@ public abstract class IgniteSpiAdapter implements IgniteSpi {
@Override public void resolveCommunicationFailure(ClusterNode node, Exception err) {
throw new UnsupportedOperationException();
}
+
+ /** {@inheritDoc} */
+ @Override public ReadOnlyMetricRegistry getOrCreateMetricRegistry(String name) {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void removeMetricRegistry(String name) {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public Iterable<ReadOnlyMetricRegistry> metricRegistries() {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void addMetricRegistryCreationListener(Consumer<ReadOnlyMetricRegistry> lsnr) {
+ // No-op.
+ }
}
}
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiContext.java b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiContext.java
index d4402f4..9c08aca 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiContext.java
@@ -21,6 +21,7 @@ import java.io.Serializable;
import java.util.Collection;
import java.util.Map;
import java.util.UUID;
+import java.util.function.Consumer;
import javax.cache.CacheException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.cluster.ClusterNode;
@@ -28,10 +29,12 @@ import org.apache.ignite.events.Event;
import org.apache.ignite.internal.managers.communication.GridMessageListener;
import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
import org.apache.ignite.lang.IgniteBiPredicate;
+import org.apache.ignite.lang.IgniteExperimental;
import org.apache.ignite.plugin.extensions.communication.MessageFactory;
import org.apache.ignite.plugin.extensions.communication.MessageFormatter;
import org.apache.ignite.plugin.security.SecuritySubject;
import org.apache.ignite.spi.discovery.DiscoveryDataBag;
+import org.apache.ignite.spi.metric.ReadOnlyMetricRegistry;
import org.jetbrains.annotations.Nullable;
/**
@@ -376,4 +379,37 @@ public interface IgniteSpiContext {
* @param err Error.
*/
public void resolveCommunicationFailure(ClusterNode node, Exception err);
+
+ /**
+ * Returns exisiting or newly created instance of metric registry with given name.
+ *
+ * @param name Metric registry name.
+ * @return Exisiting or newly created instance of metric registry.
+ */
+ @IgniteExperimental
+ public ReadOnlyMetricRegistry getOrCreateMetricRegistry(String name);
+
+ /**
+ * Removes metric registry with given name.
+ *
+ * @param name Metric registry name.
+ */
+ @IgniteExperimental
+ public void removeMetricRegistry(String name);
+
+ /**
+ * Returns all registered metric registries.
+ *
+ * @return All registered metric registries.
+ */
+ @IgniteExperimental
+ public Iterable<ReadOnlyMetricRegistry> metricRegistries();
+
+ /**
+ * Register listener which will be notified on metric registry creation.
+ *
+ * @param lsnr Listener.
+ */
+ @IgniteExperimental
+ public void addMetricRegistryCreationListener(Consumer<ReadOnlyMetricRegistry> lsnr);
}
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationMetricsListener.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationMetricsListener.java
index afece1f..86608fc 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationMetricsListener.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationMetricsListener.java
@@ -27,10 +27,15 @@ import java.util.function.Function;
import org.apache.ignite.Ignite;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.internal.managers.communication.GridIoMessage;
-import org.apache.ignite.internal.processors.metric.GridMetricManager;
+import org.apache.ignite.internal.managers.communication.IgniteMessageFactoryImpl;
import org.apache.ignite.internal.processors.metric.MetricRegistry;
import org.apache.ignite.internal.processors.metric.impl.LongAdderMetric;
+import org.apache.ignite.internal.util.collection.IntHashMap;
+import org.apache.ignite.internal.util.collection.IntMap;
+import org.apache.ignite.lang.IgniteBiTuple;
+import org.apache.ignite.plugin.extensions.communication.IgniteMessageFactory;
import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.spi.IgniteSpiContext;
import org.apache.ignite.spi.metric.LongMetric;
import org.apache.ignite.spi.metric.Metric;
import org.apache.ignite.spi.metric.ReadOnlyMetricRegistry;
@@ -60,15 +65,15 @@ import static org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi.SENT_M
* Statistics for {@link org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi}.
*/
class TcpCommunicationMetricsListener {
- /** Metrics manager. */
- private final GridMetricManager mmgr;
-
- /** Metrics registry. */
- private final org.apache.ignite.internal.processors.metric.MetricRegistry mreg;
+ /** SPI context. */
+ private final IgniteSpiContext spiCtx;
/** Current ignite instance. */
private final Ignite ignite;
+ /** Metrics registry. */
+ private final MetricRegistry mreg;
+
/** All registered metrics. */
private final Set<ThreadMetrics> allMetrics = Collections.newSetFromMap(new ConcurrentHashMap<>());
@@ -81,12 +86,6 @@ class TcpCommunicationMetricsListener {
return metrics;
});
- /** Function to be used in {@link Map#computeIfAbsent(Object, Function)} of {@code sentMsgsMetricsByType}. */
- private final Function<Short, LongAdderMetric> sentMsgsCntByTypeMetricFactory;
-
- /** Function to be used in {@link Map#computeIfAbsent(Object, Function)} of {@code rcvdMsgsMetricsByType}. */
- private final Function<Short, LongAdderMetric> rcvdMsgsCntByTypeMetricFactory;
-
/** Function to be used in {@link Map#computeIfAbsent(Object, Function)} of {@code #sentMsgsMetricsByConsistentId}. */
private final Function<Object, LongAdderMetric> sentMsgsCntByConsistentIdMetricFactory;
@@ -105,35 +104,37 @@ class TcpCommunicationMetricsListener {
/** Received messages count metric. */
private final LongAdderMetric rcvdMsgsMetric;
+ /** Counters of sent and received messages by direct type. */
+ private final IntMap<IgniteBiTuple<LongAdderMetric, LongAdderMetric>> msgCntrsByType;
+
/** Method to synchronize access to message type map. */
- private final Object msgTypMapMux = new Object();
+ private final Object msgTypeMapMux = new Object();
/** Message type map. */
private volatile Map<Short, String> msgTypeMap;
/** */
- public TcpCommunicationMetricsListener(GridMetricManager mmgr, Ignite ignite) {
- this.mmgr = mmgr;
+ public TcpCommunicationMetricsListener(Ignite ignite, IgniteSpiContext spiCtx) {
this.ignite = ignite;
+ this.spiCtx = spiCtx;
+
+ mreg = (MetricRegistry)spiCtx.getOrCreateMetricRegistry(COMMUNICATION_METRICS_GROUP_NAME);
- mreg = mmgr.registry(COMMUNICATION_METRICS_GROUP_NAME);
+ msgCntrsByType = createMessageCounters((IgniteMessageFactory)spiCtx.messageFactory());
- sentMsgsCntByTypeMetricFactory = directType -> mreg.longAdderMetric(
- sentMessagesByTypeMetricName(directType),
- SENT_MESSAGES_BY_TYPE_METRIC_DESC
- );
- rcvdMsgsCntByTypeMetricFactory = directType -> mreg.longAdderMetric(
- receivedMessagesByTypeMetricName(directType),
- RECEIVED_MESSAGES_BY_TYPE_METRIC_DESC
- );
+ sentMsgsCntByConsistentIdMetricFactory = consistentId -> {
+ String name = metricName(COMMUNICATION_METRICS_GROUP_NAME, consistentId.toString());
- sentMsgsCntByConsistentIdMetricFactory = consistentId ->
- mmgr.registry(metricName(COMMUNICATION_METRICS_GROUP_NAME, consistentId.toString()))
- .findMetric(SENT_MESSAGES_BY_NODE_CONSISTENT_ID_METRIC_NAME);
+ return spiCtx.getOrCreateMetricRegistry(name)
+ .findMetric(SENT_MESSAGES_BY_NODE_CONSISTENT_ID_METRIC_NAME);
+ };
- rcvdMsgsCntByConsistentIdMetricFactory = consistentId ->
- mmgr.registry(metricName(COMMUNICATION_METRICS_GROUP_NAME, consistentId.toString()))
- .findMetric(RECEIVED_MESSAGES_BY_NODE_CONSISTENT_ID_METRIC_NAME);
+ rcvdMsgsCntByConsistentIdMetricFactory = consistentId -> {
+ String name = metricName(COMMUNICATION_METRICS_GROUP_NAME, consistentId.toString());
+
+ return spiCtx.getOrCreateMetricRegistry(name)
+ .findMetric(RECEIVED_MESSAGES_BY_NODE_CONSISTENT_ID_METRIC_NAME);
+ };
sentBytesMetric = mreg.longAdderMetric(SENT_BYTES_METRIC_NAME, SENT_BYTES_METRIC_DESC);
rcvdBytesMetric = mreg.longAdderMetric(RECEIVED_BYTES_METRIC_NAME, RECEIVED_BYTES_METRIC_DESC);
@@ -141,17 +142,49 @@ class TcpCommunicationMetricsListener {
sentMsgsMetric = mreg.longAdderMetric(SENT_MESSAGES_METRIC_NAME, SENT_MESSAGES_METRIC_DESC);
rcvdMsgsMetric = mreg.longAdderMetric(RECEIVED_MESSAGES_METRIC_NAME, RECEIVED_MESSAGES_METRIC_DESC);
- mmgr.addMetricRegistryCreationListener(mreg -> {
+ spiCtx.addMetricRegistryCreationListener(mreg -> {
// Metrics for the specific nodes.
if (!mreg.name().startsWith(COMMUNICATION_METRICS_GROUP_NAME + SEPARATOR))
return;
- ((MetricRegistry)mreg).longAdderMetric(SENT_MESSAGES_BY_NODE_CONSISTENT_ID_METRIC_NAME, SENT_MESSAGES_BY_NODE_CONSISTENT_ID_METRIC_DESC);
+ ((MetricRegistry)mreg).longAdderMetric(
+ SENT_MESSAGES_BY_NODE_CONSISTENT_ID_METRIC_NAME,
+ SENT_MESSAGES_BY_NODE_CONSISTENT_ID_METRIC_DESC
+ );
- ((MetricRegistry)mreg).longAdderMetric(RECEIVED_MESSAGES_BY_NODE_CONSISTENT_ID_METRIC_NAME, RECEIVED_MESSAGES_BY_NODE_CONSISTENT_ID_METRIC_DESC);
+ ((MetricRegistry)mreg).longAdderMetric(
+ RECEIVED_MESSAGES_BY_NODE_CONSISTENT_ID_METRIC_NAME,
+ RECEIVED_MESSAGES_BY_NODE_CONSISTENT_ID_METRIC_DESC
+ );
});
}
+ /**
+ * Creates counters of sent and received messages by direct type.
+ *
+ * @param factory Message factory.
+ * @return Counters of sent and received messages grouped by direct type.
+ */
+ private IntMap<IgniteBiTuple<LongAdderMetric, LongAdderMetric>> createMessageCounters(IgniteMessageFactory factory) {
+ IgniteMessageFactoryImpl msgFactory = (IgniteMessageFactoryImpl)factory;
+
+ short[] directTypes = msgFactory.registeredDirectTypes();
+
+ IntMap<IgniteBiTuple<LongAdderMetric, LongAdderMetric>> msgCntrsByType = new IntHashMap<>(directTypes.length);
+
+ for (short type : directTypes) {
+ LongAdderMetric sentCnt =
+ mreg.longAdderMetric(sentMessagesByTypeMetricName(type), SENT_MESSAGES_BY_TYPE_METRIC_DESC);
+
+ LongAdderMetric rcvCnt =
+ mreg.longAdderMetric(receivedMessagesByTypeMetricName(type), RECEIVED_MESSAGES_BY_TYPE_METRIC_DESC);
+
+ msgCntrsByType.put(type, new IgniteBiTuple<>(sentCnt, rcvCnt));
+ }
+
+ return msgCntrsByType;
+ }
+
/** Metrics registry. */
public MetricRegistry metricRegistry() {
return mreg;
@@ -285,10 +318,10 @@ class TcpCommunicationMetricsListener {
if (metric.name().startsWith(prefix)) {
short directType = Short.parseShort(metric.name().substring(prefix.length()));
- Map<Short, String> msgTypMap0 = msgTypeMap;
+ Map<Short, String> msgTypeMap0 = msgTypeMap;
- if (msgTypMap0 != null) {
- String typeName = msgTypMap0.get(directType);
+ if (msgTypeMap0 != null) {
+ String typeName = msgTypeMap0.get(directType);
if (typeName != null)
res.put(typeName, ((LongMetric)metric).value());
@@ -309,7 +342,7 @@ class TcpCommunicationMetricsListener {
String mregPrefix = COMMUNICATION_METRICS_GROUP_NAME + SEPARATOR;
- for (ReadOnlyMetricRegistry mreg : mmgr) {
+ for (ReadOnlyMetricRegistry mreg : spiCtx.metricRegistries()) {
if (mreg.name().startsWith(mregPrefix)) {
String nodeConsIdStr = mreg.name().substring(mregPrefix.length());
@@ -342,7 +375,7 @@ class TcpCommunicationMetricsListener {
metric.reset();
}
- for (ReadOnlyMetricRegistry mreg : mmgr) {
+ for (ReadOnlyMetricRegistry mreg : spiCtx.metricRegistries()) {
if (mreg.name().startsWith(COMMUNICATION_METRICS_GROUP_NAME + SEPARATOR)) {
mreg.findMetric(SENT_MESSAGES_BY_NODE_CONSISTENT_ID_METRIC_NAME).reset();
@@ -363,7 +396,7 @@ class TcpCommunicationMetricsListener {
threadMetrics.rcvdMsgsMetricsByConsistentId = new HashMap<>();
}
- mmgr.remove(metricName(COMMUNICATION_METRICS_GROUP_NAME, consistentId.toString()));
+ spiCtx.removeMetricRegistry(metricName(COMMUNICATION_METRICS_GROUP_NAME, consistentId.toString()));
}
/**
@@ -374,24 +407,24 @@ class TcpCommunicationMetricsListener {
private void updateMessageTypeMap(Message msg) {
short typeId = msg.directType();
- Map<Short, String> msgTypMap0 = msgTypeMap;
+ Map<Short, String> msgTypeMap0 = msgTypeMap;
- if (msgTypMap0 == null || !msgTypMap0.containsKey(typeId)) {
- synchronized (msgTypMapMux) {
+ if (msgTypeMap0 == null || !msgTypeMap0.containsKey(typeId)) {
+ synchronized (msgTypeMapMux) {
if (msgTypeMap == null) {
- msgTypMap0 = new HashMap<>();
+ msgTypeMap0 = new HashMap<>();
- msgTypMap0.put(typeId, msg.getClass().getName());
+ msgTypeMap0.put(typeId, msg.getClass().getName());
- msgTypeMap = msgTypMap0;
+ msgTypeMap = msgTypeMap0;
}
else {
if (!msgTypeMap.containsKey(typeId)) {
- msgTypMap0 = new HashMap<>(msgTypeMap);
+ msgTypeMap0 = new HashMap<>(msgTypeMap);
- msgTypMap0.put(typeId, msg.getClass().getName());
+ msgTypeMap0.put(typeId, msg.getClass().getName());
- msgTypeMap = msgTypMap0;
+ msgTypeMap = msgTypeMap0;
}
}
}
@@ -412,29 +445,22 @@ class TcpCommunicationMetricsListener {
* Thread-local metrics.
*/
private class ThreadMetrics {
- /** Sent messages count metrics grouped by message type. */
- private final Map<Short, LongAdderMetric> sentMsgsMetricsByType = new HashMap<>();
-
- /** Received messages count metrics grouped by message type. */
- private final Map<Short, LongAdderMetric> rcvdMsgsMetricsByType = new HashMap<>();
+ /** Sent messages count metrics grouped by message node consistent id. */
+ volatile Map<Object, LongAdderMetric> sentMsgsMetricsByConsistentId = new HashMap<>();
- /**
- * Sent messages count metrics grouped by message node consistent id.
- */
- public volatile Map<Object, LongAdderMetric> sentMsgsMetricsByConsistentId = new HashMap<>();
-
- /**
- * Received messages metrics count grouped by message node consistent id.
- */
- public volatile Map<Object, LongAdderMetric> rcvdMsgsMetricsByConsistentId = new HashMap<>();
+ /** Received messages metrics count grouped by message node consistent id. */
+ volatile Map<Object, LongAdderMetric> rcvdMsgsMetricsByConsistentId = new HashMap<>();
/**
* Collects statistics for message sent by SPI.
+ *
* @param msg Sent message.
* @param consistentId Receiver node consistent id.
*/
private void onMessageSent(Message msg, Object consistentId) {
- sentMsgsMetricsByType.computeIfAbsent(msg.directType(), sentMsgsCntByTypeMetricFactory).increment();
+ IgniteBiTuple<LongAdderMetric, LongAdderMetric> cnts = msgCntrsByType.get(msg.directType());
+
+ cnts.get1().increment();
sentMsgsMetricsByConsistentId.computeIfAbsent(consistentId, sentMsgsCntByConsistentIdMetricFactory).increment();
}
@@ -445,7 +471,9 @@ class TcpCommunicationMetricsListener {
* @param consistentId Sender node consistent id.
*/
private void onMessageReceived(Message msg, Object consistentId) {
- rcvdMsgsMetricsByType.computeIfAbsent(msg.directType(), rcvdMsgsCntByTypeMetricFactory).increment();
+ IgniteBiTuple<LongAdderMetric, LongAdderMetric> cnts = msgCntrsByType.get(msg.directType());
+
+ cnts.get2().increment();
rcvdMsgsMetricsByConsistentId.computeIfAbsent(consistentId, rcvdMsgsCntByConsistentIdMetricFactory).increment();
}
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
index ce324ca..7708789 100755
--- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
@@ -75,9 +75,7 @@ import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
import org.apache.ignite.internal.managers.discovery.IgniteDiscoverySpi;
import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
import org.apache.ignite.internal.managers.eventstorage.HighPriorityListener;
-import org.apache.ignite.internal.processors.metric.GridMetricManager;
import org.apache.ignite.internal.processors.metric.impl.MetricUtils;
-import org.apache.ignite.internal.resources.MetricManagerResource;
import org.apache.ignite.internal.util.GridConcurrentFactory;
import org.apache.ignite.internal.util.GridSpinReadWriteLock;
import org.apache.ignite.internal.util.future.GridFinishedFuture;
@@ -1399,13 +1397,6 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
}
}
- /** */
- @MetricManagerResource
- private void injectMetricManager(GridMetricManager mmgr) {
- if (mmgr != null)
- metricsLsnr = new TcpCommunicationMetricsListener(mmgr, ignite);
- }
-
/**
* Sets local host address for socket binding. Note that one node could have
* additional addresses beside the loopback one. This configuration
@@ -2039,21 +2030,37 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
/** {@inheritDoc} */
@Override public int getSentMessagesCount() {
+ // Listener could be not initialized yet, but discovery thread could try to aggregate metrics.
+ if (metricsLsnr == null)
+ return 0;
+
return metricsLsnr.sentMessagesCount();
}
/** {@inheritDoc} */
@Override public long getSentBytesCount() {
+ // Listener could be not initialized yet, but discovery thread clould try to aggregate metrics.
+ if (metricsLsnr == null)
+ return 0;
+
return metricsLsnr.sentBytesCount();
}
/** {@inheritDoc} */
@Override public int getReceivedMessagesCount() {
+ // Listener could be not initialized yet, but discovery thread could try to aggregate metrics.
+ if (metricsLsnr == null)
+ return 0;
+
return metricsLsnr.receivedMessagesCount();
}
/** {@inheritDoc} */
@Override public long getReceivedBytesCount() {
+ // Listener could be not initialized yet, but discovery thread could try to aggregate metrics.
+ if (metricsLsnr == null)
+ return 0;
+
return metricsLsnr.receivedBytesCount();
}
@@ -2430,6 +2437,8 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
spiCtx.addLocalEventListener(discoLsnr, EVT_NODE_LEFT, EVT_NODE_FAILED);
ctxInitLatch.countDown();
+
+ metricsLsnr = new TcpCommunicationMetricsListener(ignite, spiCtx);
}
/** {@inheritDoc} */
@@ -5048,7 +5057,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
/**
*
*/
- private class ConnectGateway {
+ private static class ConnectGateway {
/** */
private GridSpinReadWriteLock lock = new GridSpinReadWriteLock();
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/IgniteMessageFactoryImplTest.java b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/IgniteMessageFactoryImplTest.java
index c55cc0d..4ad4040 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/IgniteMessageFactoryImplTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/IgniteMessageFactoryImplTest.java
@@ -29,6 +29,7 @@ import org.apache.ignite.plugin.extensions.communication.MessageWriter;
import org.jetbrains.annotations.Nullable;
import org.junit.Test;
+import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertTrue;
/**
@@ -41,6 +42,9 @@ public class IgniteMessageFactoryImplTest {
/** Test message 2 type. */
private static final short TEST_MSG_2_TYPE = 2;
+ /** Test message 42 type. */
+ private static final short TEST_MSG_42_TYPE = 42;
+
/** Unknown message type. */
private static final short UNKNOWN_MSG_TYPE = 0;
@@ -63,7 +67,7 @@ public class IgniteMessageFactoryImplTest {
public void testCreate() {
MessageFactory[] factories = {new TestMessageFactoryPovider(), new TestMessageFactory()};
- IgniteMessageFactory msgFactory = new IgniteMessageFactoryImpl(factories);
+ IgniteMessageFactoryImpl msgFactory = new IgniteMessageFactoryImpl(factories);
Message msg;
@@ -73,8 +77,12 @@ public class IgniteMessageFactoryImplTest {
msg = msgFactory.create(TEST_MSG_2_TYPE);
assertTrue(msg instanceof TestMessage2);
- msg = msgFactory.create(TEST_MSG_2_TYPE);
- assertTrue(msg instanceof TestMessage2);
+ msg = msgFactory.create(TEST_MSG_42_TYPE);
+ assertTrue(msg instanceof TestMessage42);
+
+ short[] directTypes = msgFactory.registeredDirectTypes();
+
+ assertArrayEquals(directTypes, new short[] {TEST_MSG_1_TYPE, TEST_MSG_2_TYPE, TEST_MSG_42_TYPE});
}
/**
@@ -111,6 +119,7 @@ public class IgniteMessageFactoryImplTest {
/** {@inheritDoc} */
@Override public void registerAll(IgniteMessageFactory factory) {
factory.register(TEST_MSG_1_TYPE, TestMessage1::new);
+ factory.register(TEST_MSG_42_TYPE, TestMessage42::new);
}
}
@@ -154,7 +163,7 @@ public class IgniteMessageFactoryImplTest {
/** {@inheritDoc} */
@Override public short directType() {
- return 1;
+ return TEST_MSG_1_TYPE;
}
/** {@inheritDoc} */
@@ -182,7 +191,35 @@ public class IgniteMessageFactoryImplTest {
/** {@inheritDoc} */
@Override public short directType() {
- return 2;
+ return TEST_MSG_2_TYPE;
+ }
+
+ /** {@inheritDoc} */
+ @Override public byte fieldsCount() {
+ return 0;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onAckReceived() {
+ // No-op.
+ }
+ }
+
+ /** Test message. */
+ private static class TestMessage42 implements Message {
+ /** {@inheritDoc} */
+ @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+ return false;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+ return false;
+ }
+
+ /** {@inheritDoc} */
+ @Override public short directType() {
+ return TEST_MSG_42_TYPE;
}
/** {@inheritDoc} */
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java b/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java
index fdea2b0..a15d7a0 100644
--- a/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java
@@ -29,6 +29,7 @@ import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentMap;
+import java.util.function.Consumer;
import org.apache.ignite.IgniteException;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.events.DiscoveryEvent;
@@ -45,12 +46,14 @@ import org.apache.ignite.internal.managers.communication.GridIoUserMessage;
import org.apache.ignite.internal.managers.communication.GridMessageListener;
import org.apache.ignite.internal.managers.communication.IgniteMessageFactoryImpl;
import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
+import org.apache.ignite.internal.processors.metric.MetricRegistry;
import org.apache.ignite.internal.processors.timeout.GridSpiTimeoutObject;
import org.apache.ignite.internal.processors.timeout.GridTimeoutProcessor;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.lang.IgniteBiPredicate;
import org.apache.ignite.lang.IgniteUuid;
+import org.apache.ignite.logger.NullLogger;
import org.apache.ignite.plugin.extensions.communication.MessageFactory;
import org.apache.ignite.plugin.extensions.communication.MessageFormatter;
import org.apache.ignite.plugin.extensions.communication.MessageReader;
@@ -62,6 +65,7 @@ import org.apache.ignite.spi.IgniteSpiContext;
import org.apache.ignite.spi.IgniteSpiException;
import org.apache.ignite.spi.IgniteSpiTimeoutObject;
import org.apache.ignite.spi.discovery.DiscoveryDataBag;
+import org.apache.ignite.spi.metric.ReadOnlyMetricRegistry;
import org.jetbrains.annotations.Nullable;
import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
@@ -90,7 +94,7 @@ public class GridSpiTestContext implements IgniteSpiContext {
private final Map<ClusterNode, Serializable> sentMsgs = new HashMap<>();
/** */
- private final ConcurrentMap<String, Map> cache = new ConcurrentHashMap<>();
+ private final ConcurrentMap<String, Map<?, ?>> cache = new ConcurrentHashMap<>();
/** */
private MessageFormatter formatter;
@@ -109,6 +113,7 @@ public class GridSpiTestContext implements IgniteSpiContext {
}
/** {@inheritDoc} */
+ @SuppressWarnings("AssignmentOrReturnOfFieldWithMutableType")
@Override public Collection<ClusterNode> remoteNodes() {
return rmtNodes;
}
@@ -611,14 +616,33 @@ public class GridSpiTestContext implements IgniteSpiContext {
throw new UnsupportedOperationException();
}
+ /** {@inheritDoc} */
+ @Override public ReadOnlyMetricRegistry getOrCreateMetricRegistry(String name) {
+ return new MetricRegistry(name, null, null, new NullLogger());
+ }
+
+ /** {@inheritDoc} */
+ @Override public void removeMetricRegistry(String name) {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public Iterable<ReadOnlyMetricRegistry> metricRegistries() {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void addMetricRegistryCreationListener(Consumer<ReadOnlyMetricRegistry> lsnr) {
+ // No-op.
+ }
+
/**
* @param cacheName Cache name.
* @return Map representing cache.
*/
- @SuppressWarnings("unchecked")
private <K, V> Map<K, V> getOrCreateCache(String cacheName) {
synchronized (cache) {
- Map<K, V> map = cache.get(cacheName);
+ Map<K, V> map = (Map<K, V>)cache.get(cacheName);
if (map == null)
cache.put(cacheName, map = new ConcurrentHashMap<>());
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/IgniteTestResources.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/IgniteTestResources.java
index a154fe5..f935e05 100644
--- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/IgniteTestResources.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/IgniteTestResources.java
@@ -32,7 +32,6 @@ import org.apache.ignite.internal.binary.BinaryCachingMetadataHandler;
import org.apache.ignite.internal.binary.BinaryContext;
import org.apache.ignite.internal.binary.BinaryMarshaller;
import org.apache.ignite.internal.processors.resource.GridResourceProcessor;
-import org.apache.ignite.internal.resources.MetricManagerResource;
import org.apache.ignite.internal.util.IgniteUtils;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.logger.NullLogger;
@@ -88,10 +87,8 @@ public class IgniteTestResources {
return U.IGNITE_MBEANS_DISABLED ? null : ManagementFactory.getPlatformMBeanServer();
}
- /**
- * @throws IgniteCheckedException If failed.
- */
- public IgniteTestResources() throws IgniteCheckedException {
+ /** */
+ public IgniteTestResources() {
if (SensitiveInfoTestLoggerProxy.TEST_SENSITIVE)
log = new SensitiveInfoTestLoggerProxy(rootLog.getLogger(getClass()), null, null, null);
else
@@ -107,7 +104,7 @@ public class IgniteTestResources {
/**
* @param cfg Ignite configuration
*/
- public IgniteTestResources(IgniteConfiguration cfg) throws IgniteCheckedException {
+ public IgniteTestResources(IgniteConfiguration cfg) {
this.cfg = cfg;
this.log = rootLog.getLogger(getClass());
this.jmx = prepareMBeanServer();
@@ -118,7 +115,7 @@ public class IgniteTestResources {
/**
* @param jmx JMX server.
*/
- public IgniteTestResources(MBeanServer jmx) throws IgniteCheckedException {
+ public IgniteTestResources(MBeanServer jmx) {
assert jmx != null;
this.jmx = jmx;
@@ -130,7 +127,7 @@ public class IgniteTestResources {
/**
* @param log Logger.
*/
- public IgniteTestResources(IgniteLogger log) throws IgniteCheckedException {
+ public IgniteTestResources(IgniteLogger log) {
assert log != null;
this.log = log.getLogger(getClass());
@@ -167,7 +164,7 @@ public class IgniteTestResources {
*/
public void startThreads(boolean prestart) {
execSvc = new IgniteThreadPoolExecutor(nodeId.toString(), null, 40, 40, Long.MAX_VALUE,
- new LinkedBlockingQueue<Runnable>());
+ new LinkedBlockingQueue<>());
// Improve concurrency for testing.
if (prestart)
@@ -194,7 +191,6 @@ public class IgniteTestResources {
rsrcProc.injectBasicResource(target, LoggerResource.class, getLogger().getLogger(target.getClass()));
rsrcProc.injectBasicResource(target, IgniteInstanceResource.class,
new IgniteMock(null, locHost, nodeId, getMarshaller(), jmx, home, cfg));
- rsrcProc.injectBasicResource(target, MetricManagerResource.class, ctx.metric());
}
/**