You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ag...@apache.org on 2020/08/31 16:24:54 UTC
[ignite] 01/03: Revert "IGNITE-12756 TcpCommunication SPI metrics
improvement"
This is an automated email from the ASF dual-hosted git repository.
agoncharuk pushed a commit to branch ignite-2.9-revert-12568
in repository https://gitbox.apache.org/repos/asf/ignite.git
commit f1fcea2e8f788a3039fe91218b990c21c778f238
Author: Alexey Goncharuk <al...@gmail.com>
AuthorDate: Mon Aug 31 18:59:34 2020 +0300
Revert "IGNITE-12756 TcpCommunication SPI metrics improvement"
This reverts commit 683f22e6
---
.../internal/managers/GridManagerAdapter.java | 18 ---
.../managers/communication/GridIoManager.java | 100 ++++++-------
.../communication/IgniteMessageFactoryImpl.java | 37 +----
.../processors/resource/GridResourceIoc.java | 21 ++-
.../processors/resource/GridResourceProcessor.java | 4 +-
.../internal/resources/MetricManagerResource.java | 32 +++++
.../org/apache/ignite/spi/IgniteSpiAdapter.java | 33 +----
.../org/apache/ignite/spi/IgniteSpiContext.java | 36 -----
.../tcp/TcpCommunicationMetricsListener.java | 156 +++++++++------------
.../spi/communication/tcp/TcpCommunicationSpi.java | 29 ++--
.../IgniteMessageFactoryImplTest.java | 47 +------
.../ignite/testframework/GridSpiTestContext.java | 30 +---
.../testframework/junits/IgniteTestResources.java | 16 ++-
13 files changed, 199 insertions(+), 360 deletions(-)
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java
index d10d774..d2952d9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java
@@ -23,7 +23,6 @@ import java.util.Collections;
import java.util.IdentityHashMap;
import java.util.Map;
import java.util.UUID;
-import java.util.function.Consumer;
import javax.cache.expiry.Duration;
import javax.cache.expiry.ExpiryPolicy;
import javax.cache.expiry.TouchedExpiryPolicy;
@@ -63,7 +62,6 @@ import org.apache.ignite.spi.IgniteSpiTimeoutObject;
import org.apache.ignite.spi.discovery.DiscoveryDataBag;
import org.apache.ignite.spi.discovery.DiscoveryDataBag.GridDiscoveryData;
import org.apache.ignite.spi.discovery.DiscoveryDataBag.JoiningNodeDiscoveryData;
-import org.apache.ignite.spi.metric.ReadOnlyMetricRegistry;
import org.jetbrains.annotations.Nullable;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
@@ -626,22 +624,6 @@ public abstract class GridManagerAdapter<T extends IgniteSpi> implements GridMan
ctx.discovery().resolveCommunicationError(node, err);
}
- @Override public ReadOnlyMetricRegistry getOrCreateMetricRegistry(String name) {
- return ctx.metric().registry(name);
- }
-
- @Override public void removeMetricRegistry(String name) {
- ctx.metric().remove(name);
- }
-
- @Override public Iterable<ReadOnlyMetricRegistry> metricRegistries() {
- return ctx.metric();
- }
-
- @Override public void addMetricRegistryCreationListener(Consumer<ReadOnlyMetricRegistry> lsnr) {
- ctx.metric().addMetricRegistryCreationListener(lsnr);
- }
-
/**
* @param e Exception to handle.
* @return GridSpiException Converted exception.
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
index 25581ce..552613f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
@@ -432,6 +432,56 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
/** {@inheritDoc} */
@Override public void start() throws IgniteCheckedException {
+ CommunicationSpi<Serializable> spi = getSpi();
+
+ if ((CommunicationSpi<?>)spi instanceof TcpCommunicationSpi)
+ getTcpCommunicationSpi().setConnectionRequestor(invConnHandler);
+
+ startSpi();
+
+ MetricRegistry ioMetric = ctx.metric().registry(COMM_METRICS);
+
+ ioMetric.register(OUTBOUND_MSG_QUEUE_CNT, spi::getOutboundMessagesQueueSize,
+ "Outbound messages queue size.");
+
+ ioMetric.register(SENT_MSG_CNT, spi::getSentMessagesCount, "Sent messages count.");
+
+ ioMetric.register(SENT_BYTES_CNT, spi::getSentBytesCount, "Sent bytes count.");
+
+ ioMetric.register(RCVD_MSGS_CNT, spi::getReceivedMessagesCount,
+ "Received messages count.");
+
+ ioMetric.register(RCVD_BYTES_CNT, spi::getReceivedBytesCount, "Received bytes count.");
+
+ getSpi().setListener(commLsnr = new CommunicationListenerEx<Serializable>() {
+ @Override public void onMessage(UUID nodeId, Serializable msg, IgniteRunnable msgC) {
+ try {
+ onMessage0(nodeId, (GridIoMessage)msg, msgC);
+ }
+ catch (ClassCastException ignored) {
+ U.error(log, "Communication manager received message of unknown type (will ignore): " +
+ msg.getClass().getName() + ". Most likely GridCommunicationSpi is being used directly, " +
+ "which is illegal - make sure to send messages only via GridProjection API.");
+ }
+ }
+
+ @Override public void onDisconnected(UUID nodeId) {
+ for (GridDisconnectListener lsnr : disconnectLsnrs)
+ lsnr.onNodeDisconnected(nodeId);
+ }
+
+ @Override public void onChannelOpened(UUID rmtNodeId, Serializable initMsg, Channel channel) {
+ try {
+ onChannelOpened0(rmtNodeId, (GridIoMessage)initMsg, channel);
+ }
+ catch (ClassCastException ignored) {
+ U.error(log, "Communication manager received message of unknown type (will ignore): " +
+ initMsg.getClass().getName() + ". Most likely GridCommunicationSpi is being used directly, " +
+ "which is illegal - make sure to send messages only via GridProjection API.");
+ }
+ }
+ });
+
ctx.addNodeAttribute(DIRECT_PROTO_VER_ATTR, DIRECT_PROTO_VER);
MessageFormatter[] formatterExt = ctx.plugins().extensions(MessageFormatter.class);
@@ -481,56 +531,6 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
msgFactory = new IgniteMessageFactoryImpl(msgs);
- CommunicationSpi<Serializable> spi = getSpi();
-
- if ((CommunicationSpi<?>)spi instanceof TcpCommunicationSpi)
- getTcpCommunicationSpi().setConnectionRequestor(invConnHandler);
-
- startSpi();
-
- MetricRegistry ioMetric = ctx.metric().registry(COMM_METRICS);
-
- ioMetric.register(OUTBOUND_MSG_QUEUE_CNT, spi::getOutboundMessagesQueueSize,
- "Outbound messages queue size.");
-
- ioMetric.register(SENT_MSG_CNT, spi::getSentMessagesCount, "Sent messages count.");
-
- ioMetric.register(SENT_BYTES_CNT, spi::getSentBytesCount, "Sent bytes count.");
-
- ioMetric.register(RCVD_MSGS_CNT, spi::getReceivedMessagesCount,
- "Received messages count.");
-
- ioMetric.register(RCVD_BYTES_CNT, spi::getReceivedBytesCount, "Received bytes count.");
-
- getSpi().setListener(commLsnr = new CommunicationListenerEx<Serializable>() {
- @Override public void onMessage(UUID nodeId, Serializable msg, IgniteRunnable msgC) {
- try {
- onMessage0(nodeId, (GridIoMessage)msg, msgC);
- }
- catch (ClassCastException ignored) {
- U.error(log, "Communication manager received message of unknown type (will ignore): " +
- msg.getClass().getName() + ". Most likely GridCommunicationSpi is being used directly, " +
- "which is illegal - make sure to send messages only via GridProjection API.");
- }
- }
-
- @Override public void onDisconnected(UUID nodeId) {
- for (GridDisconnectListener lsnr : disconnectLsnrs)
- lsnr.onNodeDisconnected(nodeId);
- }
-
- @Override public void onChannelOpened(UUID rmtNodeId, Serializable initMsg, Channel channel) {
- try {
- onChannelOpened0(rmtNodeId, (GridIoMessage)initMsg, channel);
- }
- catch (ClassCastException ignored) {
- U.error(log, "Communication manager received message of unknown type (will ignore): " +
- initMsg.getClass().getName() + ". Most likely GridCommunicationSpi is being used directly, " +
- "which is illegal - make sure to send messages only via GridProjection API.");
- }
- }
- });
-
if (log.isDebugEnabled())
log.debug(startInfo());
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/IgniteMessageFactoryImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/IgniteMessageFactoryImpl.java
index 68ce797..957ef7c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/IgniteMessageFactoryImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/IgniteMessageFactoryImpl.java
@@ -45,15 +45,6 @@ public class IgniteMessageFactoryImpl implements IgniteMessageFactory {
/** Initialized flag. If {@code true} then new message type couldn't be registered. */
private boolean initialized;
- /** Min index of registered message supplier. */
- private int minIdx = Integer.MAX_VALUE;
-
- /** Max index of registered message supplier. */
- private int maxIdx = -1;
-
- /** Count of registered message suppliers. */
- private int cnt;
-
/**
* Contructor.
*
@@ -106,21 +97,15 @@ public class IgniteMessageFactoryImpl implements IgniteMessageFactory {
Supplier<Message> curr = msgSuppliers[idx];
- if (curr == null) {
+ if (curr == null)
msgSuppliers[idx] = supplier;
-
- minIdx = Math.min(idx, minIdx);
-
- maxIdx = Math.max(idx, maxIdx);
-
- cnt++;
- }
else
throw new IgniteException("Message factory is already registered for direct type: " + directType);
}
/**
* Creates new message instance of provided direct type.
+ * <p>
*
* @param directType Message direct type.
* @return Message instance.
@@ -136,24 +121,6 @@ public class IgniteMessageFactoryImpl implements IgniteMessageFactory {
}
/**
- * Returns direct types of all registered messages.
- *
- * @return Direct types of all registered messages.
- */
- public short[] registeredDirectTypes() {
- short[] res = new short[cnt];
-
- if (cnt > 0) {
- for (int i = minIdx, p = 0; i <= maxIdx; i++) {
- if (msgSuppliers[i] != null)
- res[p++] = indexToDirectType(i);
- }
- }
-
- return res;
- }
-
- /**
* @param directType Direct type.
*/
private static int directTypeToIndex(short directType) {
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/resource/GridResourceIoc.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/resource/GridResourceIoc.java
index 4b43e04..cecaf7e86 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/resource/GridResourceIoc.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/resource/GridResourceIoc.java
@@ -30,6 +30,7 @@ import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.internal.managers.deployment.GridDeployment;
+import org.apache.ignite.internal.resources.MetricManagerResource;
import org.apache.ignite.internal.util.GridLeanIdentitySet;
import org.apache.ignite.internal.util.IgniteUtils;
import org.apache.ignite.internal.util.typedef.F;
@@ -131,7 +132,7 @@ public class GridResourceIoc {
break;
if (dep != null) {
- Set<Class<?>> classes = F.addIfAbsent(taskMap, dep.classLoader(), F.newCSet());
+ Set<Class<?>> classes = F.addIfAbsent(taskMap, dep.classLoader(), F.<Class<?>>newCSet());
classes.add(cls);
@@ -264,7 +265,7 @@ public class GridResourceIoc {
boolean allowImplicitInjection = !GridNoImplicitInjection.class.isAssignableFrom(cls);
- for (Class<?> cls0 = cls; !cls0.equals(Object.class); cls0 = cls0.getSuperclass()) {
+ for (Class cls0 = cls; !cls0.equals(Object.class); cls0 = cls0.getSuperclass()) {
for (Field field : cls0.getDeclaredFields()) {
Annotation[] fieldAnns = field.getAnnotations();
@@ -272,7 +273,9 @@ public class GridResourceIoc {
T2<List<GridResourceField>, List<GridResourceMethod>> t2 = annMap.get(ann.annotationType());
if (t2 == null) {
- t2 = new T2<>(new ArrayList<>(), new ArrayList<>());
+ t2 = new T2<List<GridResourceField>, List<GridResourceMethod>>(
+ new ArrayList<GridResourceField>(),
+ new ArrayList<GridResourceMethod>());
annMap.put(ann.annotationType(), t2);
}
@@ -295,7 +298,9 @@ public class GridResourceIoc {
T2<List<GridResourceField>, List<GridResourceMethod>> t2 = annMap.get(ann.annotationType());
if (t2 == null) {
- t2 = new T2<>(new ArrayList<>(), new ArrayList<>());
+ t2 = new T2<List<GridResourceField>, List<GridResourceMethod>>(
+ new ArrayList<GridResourceField>(),
+ new ArrayList<GridResourceMethod>());
annMap.put(ann.annotationType(), t2);
}
@@ -506,7 +511,10 @@ public class GridResourceIoc {
JOB_CONTEXT(JobContextResource.class),
/** */
- CACHE_STORE_SESSION(CacheStoreSessionResource.class);
+ CACHE_STORE_SESSION(CacheStoreSessionResource.class),
+
+ /** */
+ METRIC_MANAGER(MetricManagerResource.class);
/** */
public final Class<? extends Annotation> clazz;
@@ -529,7 +537,8 @@ public class GridResourceIoc {
ResourceAnnotation.SPRING,
ResourceAnnotation.IGNITE_INSTANCE,
ResourceAnnotation.LOGGER,
- ResourceAnnotation.SERVICE
+ ResourceAnnotation.SERVICE,
+ ResourceAnnotation.METRIC_MANAGER
),
/** */
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/resource/GridResourceProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/resource/GridResourceProcessor.java
index 461b8d9..ca51984 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/resource/GridResourceProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/resource/GridResourceProcessor.java
@@ -73,6 +73,8 @@ public class GridResourceProcessor extends GridProcessorAdapter {
new GridResourceLoggerInjector(ctx.config().getGridLogger());
injectorByAnnotation[GridResourceIoc.ResourceAnnotation.IGNITE_INSTANCE.ordinal()] =
new GridResourceBasicInjector<>(ctx.grid());
+ injectorByAnnotation[GridResourceIoc.ResourceAnnotation.METRIC_MANAGER.ordinal()] =
+ new GridResourceSupplierInjector<>(ctx::metric);
}
/** {@inheritDoc} */
@@ -391,7 +393,7 @@ public class GridResourceProcessor extends GridProcessorAdapter {
injectToJob(dep, taskCls, obj, ses, jobCtx);
if (obj instanceof GridInternalWrapper) {
- Object usrObj = ((GridInternalWrapper<?>)obj).userObject();
+ Object usrObj = ((GridInternalWrapper)obj).userObject();
if (usrObj != null)
injectToJob(dep, taskCls, usrObj, ses, jobCtx);
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/resources/MetricManagerResource.java b/modules/core/src/main/java/org/apache/ignite/internal/resources/MetricManagerResource.java
new file mode 100644
index 0000000..21cddd4
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/resources/MetricManagerResource.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.resources;
+
+import java.lang.annotation.Documented;
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+/** */
+@Documented
+@Retention(RetentionPolicy.RUNTIME)
+@Target({ElementType.METHOD, ElementType.FIELD})
+public @interface MetricManagerResource {
+ // No-op.
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java
index c2bb86b..102e5e1 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java
@@ -24,7 +24,6 @@ import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.function.Consumer;
import javax.management.JMException;
import javax.management.MBeanServer;
import javax.management.ObjectName;
@@ -35,7 +34,6 @@ import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.events.DiscoveryEvent;
import org.apache.ignite.events.Event;
-import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.IgniteKernal;
import org.apache.ignite.internal.IgniteNodeAttributes;
import org.apache.ignite.internal.managers.communication.GridMessageListener;
@@ -56,11 +54,9 @@ import org.apache.ignite.plugin.security.SecuritySubject;
import org.apache.ignite.resources.IgniteInstanceResource;
import org.apache.ignite.resources.LoggerResource;
import org.apache.ignite.spi.discovery.DiscoveryDataBag;
-import org.apache.ignite.spi.metric.ReadOnlyMetricRegistry;
import org.jetbrains.annotations.Nullable;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_SKIP_CONFIGURATION_CONSISTENCY_CHECK;
-import static org.apache.ignite.configuration.IgniteConfiguration.DFLT_FAILURE_DETECTION_TIMEOUT;
import static org.apache.ignite.events.EventType.EVT_NODE_JOINED;
/**
@@ -665,7 +661,8 @@ public abstract class IgniteSpiAdapter implements IgniteSpi {
"'IgniteConfiguration.metricsUpdateFrequency' to prevent unnecessary status checking.");
}
// Intentionally compare references using '!=' below
- else if (ignite.configuration().getFailureDetectionTimeout() != DFLT_FAILURE_DETECTION_TIMEOUT)
+ else if (ignite.configuration().getFailureDetectionTimeout() !=
+ IgniteConfiguration.DFLT_FAILURE_DETECTION_TIMEOUT)
log.warning("Failure detection timeout will be ignored (one of SPI parameters has been set explicitly)");
clientFailureDetectionTimeout = ignite.configuration().getClientFailureDetectionTimeout();
@@ -841,7 +838,7 @@ public abstract class IgniteSpiAdapter implements IgniteSpi {
/** {@inheritDoc} */
@Override public Collection<ClusterNode> nodes() {
- return locNode == null ? Collections.emptyList() : Collections.singletonList(locNode);
+ return locNode == null ? Collections.<ClusterNode>emptyList() : Collections.singletonList(locNode);
}
/** {@inheritDoc} */
@@ -946,7 +943,7 @@ public abstract class IgniteSpiAdapter implements IgniteSpi {
if (!(ignite0 instanceof IgniteKernal))
throw new IgniteSpiException("Wrong Ignite instance is set: " + ignite0);
- ((IgniteEx)ignite0).context().timeout().addTimeoutObject(new GridSpiTimeoutObject(obj));
+ ((IgniteKernal)ignite0).context().timeout().addTimeoutObject(new GridSpiTimeoutObject(obj));
}
/** {@inheritDoc} */
@@ -956,7 +953,7 @@ public abstract class IgniteSpiAdapter implements IgniteSpi {
if (!(ignite0 instanceof IgniteKernal))
throw new IgniteSpiException("Wrong Ignite instance is set: " + ignite0);
- ((IgniteEx)ignite0).context().timeout().removeTimeoutObject(new GridSpiTimeoutObject(obj));
+ ((IgniteKernal)ignite0).context().timeout().removeTimeoutObject(new GridSpiTimeoutObject(obj));
}
/** {@inheritDoc} */
@@ -973,25 +970,5 @@ public abstract class IgniteSpiAdapter implements IgniteSpi {
@Override public void resolveCommunicationFailure(ClusterNode node, Exception err) {
throw new UnsupportedOperationException();
}
-
- /** {@inheritDoc} */
- @Override public ReadOnlyMetricRegistry getOrCreateMetricRegistry(String name) {
- return null;
- }
-
- /** {@inheritDoc} */
- @Override public void removeMetricRegistry(String name) {
- // No-op.
- }
-
- /** {@inheritDoc} */
- @Override public Iterable<ReadOnlyMetricRegistry> metricRegistries() {
- return null;
- }
-
- /** {@inheritDoc} */
- @Override public void addMetricRegistryCreationListener(Consumer<ReadOnlyMetricRegistry> lsnr) {
- // No-op.
- }
}
}
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiContext.java b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiContext.java
index 9c08aca..d4402f4 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiContext.java
@@ -21,7 +21,6 @@ import java.io.Serializable;
import java.util.Collection;
import java.util.Map;
import java.util.UUID;
-import java.util.function.Consumer;
import javax.cache.CacheException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.cluster.ClusterNode;
@@ -29,12 +28,10 @@ import org.apache.ignite.events.Event;
import org.apache.ignite.internal.managers.communication.GridMessageListener;
import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
import org.apache.ignite.lang.IgniteBiPredicate;
-import org.apache.ignite.lang.IgniteExperimental;
import org.apache.ignite.plugin.extensions.communication.MessageFactory;
import org.apache.ignite.plugin.extensions.communication.MessageFormatter;
import org.apache.ignite.plugin.security.SecuritySubject;
import org.apache.ignite.spi.discovery.DiscoveryDataBag;
-import org.apache.ignite.spi.metric.ReadOnlyMetricRegistry;
import org.jetbrains.annotations.Nullable;
/**
@@ -379,37 +376,4 @@ public interface IgniteSpiContext {
* @param err Error.
*/
public void resolveCommunicationFailure(ClusterNode node, Exception err);
-
- /**
- * Returns exisiting or newly created instance of metric registry with given name.
- *
- * @param name Metric registry name.
- * @return Exisiting or newly created instance of metric registry.
- */
- @IgniteExperimental
- public ReadOnlyMetricRegistry getOrCreateMetricRegistry(String name);
-
- /**
- * Removes metric registry with given name.
- *
- * @param name Metric registry name.
- */
- @IgniteExperimental
- public void removeMetricRegistry(String name);
-
- /**
- * Returns all registered metric registries.
- *
- * @return All registered metric registries.
- */
- @IgniteExperimental
- public Iterable<ReadOnlyMetricRegistry> metricRegistries();
-
- /**
- * Register listener which will be notified on metric registry creation.
- *
- * @param lsnr Listener.
- */
- @IgniteExperimental
- public void addMetricRegistryCreationListener(Consumer<ReadOnlyMetricRegistry> lsnr);
}
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationMetricsListener.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationMetricsListener.java
index 86608fc..afece1f 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationMetricsListener.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationMetricsListener.java
@@ -27,15 +27,10 @@ import java.util.function.Function;
import org.apache.ignite.Ignite;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.internal.managers.communication.GridIoMessage;
-import org.apache.ignite.internal.managers.communication.IgniteMessageFactoryImpl;
+import org.apache.ignite.internal.processors.metric.GridMetricManager;
import org.apache.ignite.internal.processors.metric.MetricRegistry;
import org.apache.ignite.internal.processors.metric.impl.LongAdderMetric;
-import org.apache.ignite.internal.util.collection.IntHashMap;
-import org.apache.ignite.internal.util.collection.IntMap;
-import org.apache.ignite.lang.IgniteBiTuple;
-import org.apache.ignite.plugin.extensions.communication.IgniteMessageFactory;
import org.apache.ignite.plugin.extensions.communication.Message;
-import org.apache.ignite.spi.IgniteSpiContext;
import org.apache.ignite.spi.metric.LongMetric;
import org.apache.ignite.spi.metric.Metric;
import org.apache.ignite.spi.metric.ReadOnlyMetricRegistry;
@@ -65,15 +60,15 @@ import static org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi.SENT_M
* Statistics for {@link org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi}.
*/
class TcpCommunicationMetricsListener {
- /** SPI context. */
- private final IgniteSpiContext spiCtx;
+ /** Metrics manager. */
+ private final GridMetricManager mmgr;
+
+ /** Metrics registry. */
+ private final org.apache.ignite.internal.processors.metric.MetricRegistry mreg;
/** Current ignite instance. */
private final Ignite ignite;
- /** Metrics registry. */
- private final MetricRegistry mreg;
-
/** All registered metrics. */
private final Set<ThreadMetrics> allMetrics = Collections.newSetFromMap(new ConcurrentHashMap<>());
@@ -86,6 +81,12 @@ class TcpCommunicationMetricsListener {
return metrics;
});
+ /** Function to be used in {@link Map#computeIfAbsent(Object, Function)} of {@code sentMsgsMetricsByType}. */
+ private final Function<Short, LongAdderMetric> sentMsgsCntByTypeMetricFactory;
+
+ /** Function to be used in {@link Map#computeIfAbsent(Object, Function)} of {@code rcvdMsgsMetricsByType}. */
+ private final Function<Short, LongAdderMetric> rcvdMsgsCntByTypeMetricFactory;
+
/** Function to be used in {@link Map#computeIfAbsent(Object, Function)} of {@code #sentMsgsMetricsByConsistentId}. */
private final Function<Object, LongAdderMetric> sentMsgsCntByConsistentIdMetricFactory;
@@ -104,37 +105,35 @@ class TcpCommunicationMetricsListener {
/** Received messages count metric. */
private final LongAdderMetric rcvdMsgsMetric;
- /** Counters of sent and received messages by direct type. */
- private final IntMap<IgniteBiTuple<LongAdderMetric, LongAdderMetric>> msgCntrsByType;
-
/** Method to synchronize access to message type map. */
- private final Object msgTypeMapMux = new Object();
+ private final Object msgTypMapMux = new Object();
/** Message type map. */
private volatile Map<Short, String> msgTypeMap;
/** */
- public TcpCommunicationMetricsListener(Ignite ignite, IgniteSpiContext spiCtx) {
+ public TcpCommunicationMetricsListener(GridMetricManager mmgr, Ignite ignite) {
+ this.mmgr = mmgr;
this.ignite = ignite;
- this.spiCtx = spiCtx;
-
- mreg = (MetricRegistry)spiCtx.getOrCreateMetricRegistry(COMMUNICATION_METRICS_GROUP_NAME);
- msgCntrsByType = createMessageCounters((IgniteMessageFactory)spiCtx.messageFactory());
+ mreg = mmgr.registry(COMMUNICATION_METRICS_GROUP_NAME);
- sentMsgsCntByConsistentIdMetricFactory = consistentId -> {
- String name = metricName(COMMUNICATION_METRICS_GROUP_NAME, consistentId.toString());
+ sentMsgsCntByTypeMetricFactory = directType -> mreg.longAdderMetric(
+ sentMessagesByTypeMetricName(directType),
+ SENT_MESSAGES_BY_TYPE_METRIC_DESC
+ );
+ rcvdMsgsCntByTypeMetricFactory = directType -> mreg.longAdderMetric(
+ receivedMessagesByTypeMetricName(directType),
+ RECEIVED_MESSAGES_BY_TYPE_METRIC_DESC
+ );
- return spiCtx.getOrCreateMetricRegistry(name)
- .findMetric(SENT_MESSAGES_BY_NODE_CONSISTENT_ID_METRIC_NAME);
- };
+ sentMsgsCntByConsistentIdMetricFactory = consistentId ->
+ mmgr.registry(metricName(COMMUNICATION_METRICS_GROUP_NAME, consistentId.toString()))
+ .findMetric(SENT_MESSAGES_BY_NODE_CONSISTENT_ID_METRIC_NAME);
- rcvdMsgsCntByConsistentIdMetricFactory = consistentId -> {
- String name = metricName(COMMUNICATION_METRICS_GROUP_NAME, consistentId.toString());
-
- return spiCtx.getOrCreateMetricRegistry(name)
- .findMetric(RECEIVED_MESSAGES_BY_NODE_CONSISTENT_ID_METRIC_NAME);
- };
+ rcvdMsgsCntByConsistentIdMetricFactory = consistentId ->
+ mmgr.registry(metricName(COMMUNICATION_METRICS_GROUP_NAME, consistentId.toString()))
+ .findMetric(RECEIVED_MESSAGES_BY_NODE_CONSISTENT_ID_METRIC_NAME);
sentBytesMetric = mreg.longAdderMetric(SENT_BYTES_METRIC_NAME, SENT_BYTES_METRIC_DESC);
rcvdBytesMetric = mreg.longAdderMetric(RECEIVED_BYTES_METRIC_NAME, RECEIVED_BYTES_METRIC_DESC);
@@ -142,49 +141,17 @@ class TcpCommunicationMetricsListener {
sentMsgsMetric = mreg.longAdderMetric(SENT_MESSAGES_METRIC_NAME, SENT_MESSAGES_METRIC_DESC);
rcvdMsgsMetric = mreg.longAdderMetric(RECEIVED_MESSAGES_METRIC_NAME, RECEIVED_MESSAGES_METRIC_DESC);
- spiCtx.addMetricRegistryCreationListener(mreg -> {
+ mmgr.addMetricRegistryCreationListener(mreg -> {
// Metrics for the specific nodes.
if (!mreg.name().startsWith(COMMUNICATION_METRICS_GROUP_NAME + SEPARATOR))
return;
- ((MetricRegistry)mreg).longAdderMetric(
- SENT_MESSAGES_BY_NODE_CONSISTENT_ID_METRIC_NAME,
- SENT_MESSAGES_BY_NODE_CONSISTENT_ID_METRIC_DESC
- );
+ ((MetricRegistry)mreg).longAdderMetric(SENT_MESSAGES_BY_NODE_CONSISTENT_ID_METRIC_NAME, SENT_MESSAGES_BY_NODE_CONSISTENT_ID_METRIC_DESC);
- ((MetricRegistry)mreg).longAdderMetric(
- RECEIVED_MESSAGES_BY_NODE_CONSISTENT_ID_METRIC_NAME,
- RECEIVED_MESSAGES_BY_NODE_CONSISTENT_ID_METRIC_DESC
- );
+ ((MetricRegistry)mreg).longAdderMetric(RECEIVED_MESSAGES_BY_NODE_CONSISTENT_ID_METRIC_NAME, RECEIVED_MESSAGES_BY_NODE_CONSISTENT_ID_METRIC_DESC);
});
}
- /**
- * Creates counters of sent and received messages by direct type.
- *
- * @param factory Message factory.
- * @return Counters of sent and received messages grouped by direct type.
- */
- private IntMap<IgniteBiTuple<LongAdderMetric, LongAdderMetric>> createMessageCounters(IgniteMessageFactory factory) {
- IgniteMessageFactoryImpl msgFactory = (IgniteMessageFactoryImpl)factory;
-
- short[] directTypes = msgFactory.registeredDirectTypes();
-
- IntMap<IgniteBiTuple<LongAdderMetric, LongAdderMetric>> msgCntrsByType = new IntHashMap<>(directTypes.length);
-
- for (short type : directTypes) {
- LongAdderMetric sentCnt =
- mreg.longAdderMetric(sentMessagesByTypeMetricName(type), SENT_MESSAGES_BY_TYPE_METRIC_DESC);
-
- LongAdderMetric rcvCnt =
- mreg.longAdderMetric(receivedMessagesByTypeMetricName(type), RECEIVED_MESSAGES_BY_TYPE_METRIC_DESC);
-
- msgCntrsByType.put(type, new IgniteBiTuple<>(sentCnt, rcvCnt));
- }
-
- return msgCntrsByType;
- }
-
/** Metrics registry. */
public MetricRegistry metricRegistry() {
return mreg;
@@ -318,10 +285,10 @@ class TcpCommunicationMetricsListener {
if (metric.name().startsWith(prefix)) {
short directType = Short.parseShort(metric.name().substring(prefix.length()));
- Map<Short, String> msgTypeMap0 = msgTypeMap;
+ Map<Short, String> msgTypMap0 = msgTypeMap;
- if (msgTypeMap0 != null) {
- String typeName = msgTypeMap0.get(directType);
+ if (msgTypMap0 != null) {
+ String typeName = msgTypMap0.get(directType);
if (typeName != null)
res.put(typeName, ((LongMetric)metric).value());
@@ -342,7 +309,7 @@ class TcpCommunicationMetricsListener {
String mregPrefix = COMMUNICATION_METRICS_GROUP_NAME + SEPARATOR;
- for (ReadOnlyMetricRegistry mreg : spiCtx.metricRegistries()) {
+ for (ReadOnlyMetricRegistry mreg : mmgr) {
if (mreg.name().startsWith(mregPrefix)) {
String nodeConsIdStr = mreg.name().substring(mregPrefix.length());
@@ -375,7 +342,7 @@ class TcpCommunicationMetricsListener {
metric.reset();
}
- for (ReadOnlyMetricRegistry mreg : spiCtx.metricRegistries()) {
+ for (ReadOnlyMetricRegistry mreg : mmgr) {
if (mreg.name().startsWith(COMMUNICATION_METRICS_GROUP_NAME + SEPARATOR)) {
mreg.findMetric(SENT_MESSAGES_BY_NODE_CONSISTENT_ID_METRIC_NAME).reset();
@@ -396,7 +363,7 @@ class TcpCommunicationMetricsListener {
threadMetrics.rcvdMsgsMetricsByConsistentId = new HashMap<>();
}
- spiCtx.removeMetricRegistry(metricName(COMMUNICATION_METRICS_GROUP_NAME, consistentId.toString()));
+ mmgr.remove(metricName(COMMUNICATION_METRICS_GROUP_NAME, consistentId.toString()));
}
/**
@@ -407,24 +374,24 @@ class TcpCommunicationMetricsListener {
private void updateMessageTypeMap(Message msg) {
short typeId = msg.directType();
- Map<Short, String> msgTypeMap0 = msgTypeMap;
+ Map<Short, String> msgTypMap0 = msgTypeMap;
- if (msgTypeMap0 == null || !msgTypeMap0.containsKey(typeId)) {
- synchronized (msgTypeMapMux) {
+ if (msgTypMap0 == null || !msgTypMap0.containsKey(typeId)) {
+ synchronized (msgTypMapMux) {
if (msgTypeMap == null) {
- msgTypeMap0 = new HashMap<>();
+ msgTypMap0 = new HashMap<>();
- msgTypeMap0.put(typeId, msg.getClass().getName());
+ msgTypMap0.put(typeId, msg.getClass().getName());
- msgTypeMap = msgTypeMap0;
+ msgTypeMap = msgTypMap0;
}
else {
if (!msgTypeMap.containsKey(typeId)) {
- msgTypeMap0 = new HashMap<>(msgTypeMap);
+ msgTypMap0 = new HashMap<>(msgTypeMap);
- msgTypeMap0.put(typeId, msg.getClass().getName());
+ msgTypMap0.put(typeId, msg.getClass().getName());
- msgTypeMap = msgTypeMap0;
+ msgTypeMap = msgTypMap0;
}
}
}
@@ -445,22 +412,29 @@ class TcpCommunicationMetricsListener {
* Thread-local metrics.
*/
private class ThreadMetrics {
- /** Sent messages count metrics grouped by message node consistent id. */
- volatile Map<Object, LongAdderMetric> sentMsgsMetricsByConsistentId = new HashMap<>();
+ /** Sent messages count metrics grouped by message type. */
+ private final Map<Short, LongAdderMetric> sentMsgsMetricsByType = new HashMap<>();
+
+ /** Received messages count metrics grouped by message type. */
+ private final Map<Short, LongAdderMetric> rcvdMsgsMetricsByType = new HashMap<>();
- /** Received messages metrics count grouped by message node consistent id. */
- volatile Map<Object, LongAdderMetric> rcvdMsgsMetricsByConsistentId = new HashMap<>();
+ /**
+ * Sent messages count metrics grouped by message node consistent id.
+ */
+ public volatile Map<Object, LongAdderMetric> sentMsgsMetricsByConsistentId = new HashMap<>();
+
+ /**
+ * Received messages metrics count grouped by message node consistent id.
+ */
+ public volatile Map<Object, LongAdderMetric> rcvdMsgsMetricsByConsistentId = new HashMap<>();
/**
* Collects statistics for message sent by SPI.
- *
* @param msg Sent message.
* @param consistentId Receiver node consistent id.
*/
private void onMessageSent(Message msg, Object consistentId) {
- IgniteBiTuple<LongAdderMetric, LongAdderMetric> cnts = msgCntrsByType.get(msg.directType());
-
- cnts.get1().increment();
+ sentMsgsMetricsByType.computeIfAbsent(msg.directType(), sentMsgsCntByTypeMetricFactory).increment();
sentMsgsMetricsByConsistentId.computeIfAbsent(consistentId, sentMsgsCntByConsistentIdMetricFactory).increment();
}
@@ -471,9 +445,7 @@ class TcpCommunicationMetricsListener {
* @param consistentId Sender node consistent id.
*/
private void onMessageReceived(Message msg, Object consistentId) {
- IgniteBiTuple<LongAdderMetric, LongAdderMetric> cnts = msgCntrsByType.get(msg.directType());
-
- cnts.get2().increment();
+ rcvdMsgsMetricsByType.computeIfAbsent(msg.directType(), rcvdMsgsCntByTypeMetricFactory).increment();
rcvdMsgsMetricsByConsistentId.computeIfAbsent(consistentId, rcvdMsgsCntByConsistentIdMetricFactory).increment();
}
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
index 59f22ae..862f149 100755
--- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
@@ -77,11 +77,13 @@ import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
import org.apache.ignite.internal.managers.discovery.IgniteDiscoverySpi;
import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
import org.apache.ignite.internal.managers.eventstorage.HighPriorityListener;
+import org.apache.ignite.internal.processors.metric.GridMetricManager;
import org.apache.ignite.internal.processors.metric.impl.MetricUtils;
import org.apache.ignite.internal.processors.tracing.MTC;
import org.apache.ignite.internal.processors.tracing.NoopTracing;
import org.apache.ignite.internal.processors.tracing.SpanTags;
import org.apache.ignite.internal.processors.tracing.Tracing;
+import org.apache.ignite.internal.resources.MetricManagerResource;
import org.apache.ignite.internal.util.GridConcurrentFactory;
import org.apache.ignite.internal.util.GridSpinReadWriteLock;
import org.apache.ignite.internal.util.future.GridFinishedFuture;
@@ -1422,6 +1424,13 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
}
}
+ /** */
+ @MetricManagerResource
+ private void injectMetricManager(GridMetricManager mmgr) {
+ if (mmgr != null)
+ metricsLsnr = new TcpCommunicationMetricsListener(mmgr, ignite);
+ }
+
/**
* Sets local host address for socket binding. Note that one node could have
* additional addresses beside the loopback one. This configuration
@@ -2085,37 +2094,21 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
/** {@inheritDoc} */
@Override public int getSentMessagesCount() {
- // Listener could be not initialized yet, but discovery thread could try to aggregate metrics.
- if (metricsLsnr == null)
- return 0;
-
return metricsLsnr.sentMessagesCount();
}
/** {@inheritDoc} */
@Override public long getSentBytesCount() {
- // Listener could be not initialized yet, but discovery thread clould try to aggregate metrics.
- if (metricsLsnr == null)
- return 0;
-
return metricsLsnr.sentBytesCount();
}
/** {@inheritDoc} */
@Override public int getReceivedMessagesCount() {
- // Listener could be not initialized yet, but discovery thread could try to aggregate metrics.
- if (metricsLsnr == null)
- return 0;
-
return metricsLsnr.receivedMessagesCount();
}
/** {@inheritDoc} */
@Override public long getReceivedBytesCount() {
- // Listener could be not initialized yet, but discovery thread could try to aggregate metrics.
- if (metricsLsnr == null)
- return 0;
-
return metricsLsnr.receivedBytesCount();
}
@@ -2504,8 +2497,6 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
spiCtx.addLocalEventListener(discoLsnr, EVT_NODE_LEFT, EVT_NODE_FAILED);
ctxInitLatch.countDown();
-
- metricsLsnr = new TcpCommunicationMetricsListener(ignite, spiCtx);
}
/** {@inheritDoc} */
@@ -5323,7 +5314,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
/**
*
*/
- private static class ConnectGateway {
+ private class ConnectGateway {
/** */
private GridSpinReadWriteLock lock = new GridSpinReadWriteLock();
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/IgniteMessageFactoryImplTest.java b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/IgniteMessageFactoryImplTest.java
index 4ad4040..c55cc0d 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/IgniteMessageFactoryImplTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/IgniteMessageFactoryImplTest.java
@@ -29,7 +29,6 @@ import org.apache.ignite.plugin.extensions.communication.MessageWriter;
import org.jetbrains.annotations.Nullable;
import org.junit.Test;
-import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertTrue;
/**
@@ -42,9 +41,6 @@ public class IgniteMessageFactoryImplTest {
/** Test message 2 type. */
private static final short TEST_MSG_2_TYPE = 2;
- /** Test message 42 type. */
- private static final short TEST_MSG_42_TYPE = 42;
-
/** Unknown message type. */
private static final short UNKNOWN_MSG_TYPE = 0;
@@ -67,7 +63,7 @@ public class IgniteMessageFactoryImplTest {
public void testCreate() {
MessageFactory[] factories = {new TestMessageFactoryPovider(), new TestMessageFactory()};
- IgniteMessageFactoryImpl msgFactory = new IgniteMessageFactoryImpl(factories);
+ IgniteMessageFactory msgFactory = new IgniteMessageFactoryImpl(factories);
Message msg;
@@ -77,12 +73,8 @@ public class IgniteMessageFactoryImplTest {
msg = msgFactory.create(TEST_MSG_2_TYPE);
assertTrue(msg instanceof TestMessage2);
- msg = msgFactory.create(TEST_MSG_42_TYPE);
- assertTrue(msg instanceof TestMessage42);
-
- short[] directTypes = msgFactory.registeredDirectTypes();
-
- assertArrayEquals(directTypes, new short[] {TEST_MSG_1_TYPE, TEST_MSG_2_TYPE, TEST_MSG_42_TYPE});
+ msg = msgFactory.create(TEST_MSG_2_TYPE);
+ assertTrue(msg instanceof TestMessage2);
}
/**
@@ -119,7 +111,6 @@ public class IgniteMessageFactoryImplTest {
/** {@inheritDoc} */
@Override public void registerAll(IgniteMessageFactory factory) {
factory.register(TEST_MSG_1_TYPE, TestMessage1::new);
- factory.register(TEST_MSG_42_TYPE, TestMessage42::new);
}
}
@@ -163,7 +154,7 @@ public class IgniteMessageFactoryImplTest {
/** {@inheritDoc} */
@Override public short directType() {
- return TEST_MSG_1_TYPE;
+ return 1;
}
/** {@inheritDoc} */
@@ -191,35 +182,7 @@ public class IgniteMessageFactoryImplTest {
/** {@inheritDoc} */
@Override public short directType() {
- return TEST_MSG_2_TYPE;
- }
-
- /** {@inheritDoc} */
- @Override public byte fieldsCount() {
- return 0;
- }
-
- /** {@inheritDoc} */
- @Override public void onAckReceived() {
- // No-op.
- }
- }
-
- /** Test message. */
- private static class TestMessage42 implements Message {
- /** {@inheritDoc} */
- @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
- return false;
- }
-
- /** {@inheritDoc} */
- @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
- return false;
- }
-
- /** {@inheritDoc} */
- @Override public short directType() {
- return TEST_MSG_42_TYPE;
+ return 2;
}
/** {@inheritDoc} */
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java b/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java
index a15d7a0..fdea2b0 100644
--- a/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java
@@ -29,7 +29,6 @@ import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentMap;
-import java.util.function.Consumer;
import org.apache.ignite.IgniteException;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.events.DiscoveryEvent;
@@ -46,14 +45,12 @@ import org.apache.ignite.internal.managers.communication.GridIoUserMessage;
import org.apache.ignite.internal.managers.communication.GridMessageListener;
import org.apache.ignite.internal.managers.communication.IgniteMessageFactoryImpl;
import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
-import org.apache.ignite.internal.processors.metric.MetricRegistry;
import org.apache.ignite.internal.processors.timeout.GridSpiTimeoutObject;
import org.apache.ignite.internal.processors.timeout.GridTimeoutProcessor;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.lang.IgniteBiPredicate;
import org.apache.ignite.lang.IgniteUuid;
-import org.apache.ignite.logger.NullLogger;
import org.apache.ignite.plugin.extensions.communication.MessageFactory;
import org.apache.ignite.plugin.extensions.communication.MessageFormatter;
import org.apache.ignite.plugin.extensions.communication.MessageReader;
@@ -65,7 +62,6 @@ import org.apache.ignite.spi.IgniteSpiContext;
import org.apache.ignite.spi.IgniteSpiException;
import org.apache.ignite.spi.IgniteSpiTimeoutObject;
import org.apache.ignite.spi.discovery.DiscoveryDataBag;
-import org.apache.ignite.spi.metric.ReadOnlyMetricRegistry;
import org.jetbrains.annotations.Nullable;
import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
@@ -94,7 +90,7 @@ public class GridSpiTestContext implements IgniteSpiContext {
private final Map<ClusterNode, Serializable> sentMsgs = new HashMap<>();
/** */
- private final ConcurrentMap<String, Map<?, ?>> cache = new ConcurrentHashMap<>();
+ private final ConcurrentMap<String, Map> cache = new ConcurrentHashMap<>();
/** */
private MessageFormatter formatter;
@@ -113,7 +109,6 @@ public class GridSpiTestContext implements IgniteSpiContext {
}
/** {@inheritDoc} */
- @SuppressWarnings("AssignmentOrReturnOfFieldWithMutableType")
@Override public Collection<ClusterNode> remoteNodes() {
return rmtNodes;
}
@@ -616,33 +611,14 @@ public class GridSpiTestContext implements IgniteSpiContext {
throw new UnsupportedOperationException();
}
- /** {@inheritDoc} */
- @Override public ReadOnlyMetricRegistry getOrCreateMetricRegistry(String name) {
- return new MetricRegistry(name, null, null, new NullLogger());
- }
-
- /** {@inheritDoc} */
- @Override public void removeMetricRegistry(String name) {
- // No-op.
- }
-
- /** {@inheritDoc} */
- @Override public Iterable<ReadOnlyMetricRegistry> metricRegistries() {
- return null;
- }
-
- /** {@inheritDoc} */
- @Override public void addMetricRegistryCreationListener(Consumer<ReadOnlyMetricRegistry> lsnr) {
- // No-op.
- }
-
/**
* @param cacheName Cache name.
* @return Map representing cache.
*/
+ @SuppressWarnings("unchecked")
private <K, V> Map<K, V> getOrCreateCache(String cacheName) {
synchronized (cache) {
- Map<K, V> map = (Map<K, V>)cache.get(cacheName);
+ Map<K, V> map = cache.get(cacheName);
if (map == null)
cache.put(cacheName, map = new ConcurrentHashMap<>());
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/IgniteTestResources.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/IgniteTestResources.java
index f935e05e..a154fe5 100644
--- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/IgniteTestResources.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/IgniteTestResources.java
@@ -32,6 +32,7 @@ import org.apache.ignite.internal.binary.BinaryCachingMetadataHandler;
import org.apache.ignite.internal.binary.BinaryContext;
import org.apache.ignite.internal.binary.BinaryMarshaller;
import org.apache.ignite.internal.processors.resource.GridResourceProcessor;
+import org.apache.ignite.internal.resources.MetricManagerResource;
import org.apache.ignite.internal.util.IgniteUtils;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.logger.NullLogger;
@@ -87,8 +88,10 @@ public class IgniteTestResources {
return U.IGNITE_MBEANS_DISABLED ? null : ManagementFactory.getPlatformMBeanServer();
}
- /** */
- public IgniteTestResources() {
+ /**
+ * @throws IgniteCheckedException If failed.
+ */
+ public IgniteTestResources() throws IgniteCheckedException {
if (SensitiveInfoTestLoggerProxy.TEST_SENSITIVE)
log = new SensitiveInfoTestLoggerProxy(rootLog.getLogger(getClass()), null, null, null);
else
@@ -104,7 +107,7 @@ public class IgniteTestResources {
/**
* @param cfg Ignite configuration
*/
- public IgniteTestResources(IgniteConfiguration cfg) {
+ public IgniteTestResources(IgniteConfiguration cfg) throws IgniteCheckedException {
this.cfg = cfg;
this.log = rootLog.getLogger(getClass());
this.jmx = prepareMBeanServer();
@@ -115,7 +118,7 @@ public class IgniteTestResources {
/**
* @param jmx JMX server.
*/
- public IgniteTestResources(MBeanServer jmx) {
+ public IgniteTestResources(MBeanServer jmx) throws IgniteCheckedException {
assert jmx != null;
this.jmx = jmx;
@@ -127,7 +130,7 @@ public class IgniteTestResources {
/**
* @param log Logger.
*/
- public IgniteTestResources(IgniteLogger log) {
+ public IgniteTestResources(IgniteLogger log) throws IgniteCheckedException {
assert log != null;
this.log = log.getLogger(getClass());
@@ -164,7 +167,7 @@ public class IgniteTestResources {
*/
public void startThreads(boolean prestart) {
execSvc = new IgniteThreadPoolExecutor(nodeId.toString(), null, 40, 40, Long.MAX_VALUE,
- new LinkedBlockingQueue<>());
+ new LinkedBlockingQueue<Runnable>());
// Improve concurrency for testing.
if (prestart)
@@ -191,6 +194,7 @@ public class IgniteTestResources {
rsrcProc.injectBasicResource(target, LoggerResource.class, getLogger().getLogger(target.getClass()));
rsrcProc.injectBasicResource(target, IgniteInstanceResource.class,
new IgniteMock(null, locHost, nodeId, getMarshaller(), jmx, home, cfg));
+ rsrcProc.injectBasicResource(target, MetricManagerResource.class, ctx.metric());
}
/**