You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ag...@apache.org on 2017/01/19 12:05:44 UTC
[4/5] ignite git commit: IGNITE-4157 Use discovery custom messages
instead of marshaller cache - Fixes #1271.
http://git-wip-us.apache.org/repos/asf/ignite/blob/4cd332b7/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoPolicy.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoPolicy.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoPolicy.java
index 18235d2..cb673d0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoPolicy.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoPolicy.java
@@ -40,14 +40,11 @@ public class GridIoPolicy {
/** Utility cache execution pool. */
public static final byte UTILITY_CACHE_POOL = 5;
- /** Marshaller cache execution pool. */
- public static final byte MARSH_CACHE_POOL = 6;
-
/** IGFS pool. */
- public static final byte IGFS_POOL = 7;
+ public static final byte IGFS_POOL = 6;
/** Pool for handling distributed index range requests. */
- public static final byte IDX_POOL = 8;
+ public static final byte IDX_POOL = 7;
/** Data streamer execution pool. */
public static final byte DATA_STREAMER_POOL = 9;
http://git-wip-us.apache.org/repos/asf/ignite/blob/4cd332b7/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
index 9aa4db1..d15a87a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
@@ -18,7 +18,6 @@
package org.apache.ignite.internal.managers.discovery;
import java.io.Externalizable;
-import java.io.Serializable;
import java.lang.management.GarbageCollectorMXBean;
import java.lang.management.ManagementFactory;
import java.lang.management.MemoryMXBean;
@@ -104,6 +103,8 @@ import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.plugin.security.SecurityCredentials;
import org.apache.ignite.plugin.segmentation.SegmentationPolicy;
import org.apache.ignite.spi.IgniteSpiException;
+import org.apache.ignite.spi.discovery.DiscoveryDataBag;
+import org.apache.ignite.spi.discovery.DiscoveryDataBag.JoiningNodeDiscoveryData;
import org.apache.ignite.spi.discovery.DiscoveryMetricsProvider;
import org.apache.ignite.spi.discovery.DiscoverySpi;
import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage;
@@ -642,41 +643,40 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
});
spi.setDataExchange(new DiscoverySpiDataExchange() {
- @Override public Map<Integer, Serializable> collect(UUID nodeId) {
- assert nodeId != null;
+ @Override public DiscoveryDataBag collect(DiscoveryDataBag dataBag) {
+ assert dataBag != null;
+ assert dataBag.joiningNodeId() != null;
- Map<Integer, Serializable> data = new HashMap<>();
-
- for (GridComponent comp : ctx.components()) {
- Serializable compData = comp.collectDiscoveryData(nodeId);
-
- if (compData != null) {
- assert comp.discoveryDataType() != null;
-
- data.put(comp.discoveryDataType().ordinal(), compData);
- }
+ if (ctx.localNodeId().equals(dataBag.joiningNodeId())) {
+ for (GridComponent c : ctx.components())
+ c.collectJoiningNodeData(dataBag);
+ }
+ else {
+ for (GridComponent c : ctx.components())
+ c.collectGridNodeData(dataBag);
}
- return data;
+ return dataBag;
}
- @Override public void onExchange(UUID joiningNodeId, UUID nodeId, Map<Integer, Serializable> data) {
- for (Map.Entry<Integer, Serializable> e : data.entrySet()) {
- GridComponent comp = null;
-
+ @Override public void onExchange(DiscoveryDataBag dataBag) {
+ if (ctx.localNodeId().equals(dataBag.joiningNodeId())) {
+ //NodeAdded msg reached joining node after round-trip over the ring
for (GridComponent c : ctx.components()) {
- if (c.discoveryDataType() != null && c.discoveryDataType().ordinal() == e.getKey()) {
- comp = c;
-
- break;
- }
+ if (c.discoveryDataType() != null)
+ c.onGridDataReceived(dataBag.gridDiscoveryData(c.discoveryDataType().ordinal()));
}
+ }
+ else {
+ //discovery data from newly joined node has to be applied to the current old node
+ for (GridComponent c : ctx.components()) {
+ if (c.discoveryDataType() != null) {
+ JoiningNodeDiscoveryData data =
+ dataBag.newJoinerDiscoveryData(c.discoveryDataType().ordinal());
- if (comp != null)
- comp.onDiscoveryDataReceived(joiningNodeId, nodeId, e.getValue());
- else {
- if (log.isDebugEnabled())
- log.debug("Received discovery data for unknown component: " + e.getKey());
+ if (data != null)
+ c.onJoiningNodeDataReceived(data);
+ }
}
}
}
@@ -1555,6 +1555,11 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
return discoCache().allNodes();
}
+ /** @return all alive server nodes is topology */
+ public Collection<ClusterNode> aliveSrvNodes() {
+ return discoCache().aliveSrvNodes();
+ }
+
/** @return Full topology size. */
public int size() {
return discoCache().allNodes().size();
@@ -2538,6 +2543,9 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
/** Highest node order. */
private final long maxOrder;
+ /** Alive server nodes */
+ private final Collection<ClusterNode> aliveSrvNodes;
+
/**
* Cached alive nodes list. As long as this collection doesn't accept {@code null}s use {@link
* #maskNull(String)} before passing raw cache names to it.
@@ -2589,6 +2597,8 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
aliveSrvNodesWithCaches = new ConcurrentSkipListMap<>(GridNodeOrderComparator.INSTANCE);
nodesByVer = new TreeMap<>();
+ List<ClusterNode> aliveSrvNodesList = new ArrayList<>(allNodes.size());
+
long maxOrder0 = 0;
Set<String> nearEnabledSet = new HashSet<>();
@@ -2640,8 +2650,12 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
}
}
- if (hasCaches && alive(node.id()) && !CU.clientNode(node))
- aliveSrvNodesWithCaches.put(node, Boolean.TRUE);
+ if (alive(node.id()) && !CU.clientNode(node)) {
+ aliveSrvNodesList.add(node);
+
+ if (hasCaches)
+ aliveSrvNodesWithCaches.put(node, Boolean.TRUE);
+ }
IgniteProductVersion nodeVer = U.productVersion(node);
@@ -2673,6 +2687,8 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
maxOrder = maxOrder0;
+ aliveSrvNodes = Collections.unmodifiableList(aliveSrvNodesList);
+
allCacheNodes = Collections.unmodifiableMap(cacheMap);
rmtCacheNodes = Collections.unmodifiableMap(rmtCacheMap);
affCacheNodes = Collections.unmodifiableMap(dhtNodesMap);
@@ -2770,6 +2786,13 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
}
/**
+ * Gets all alive server nodes.
+ */
+ Collection<ClusterNode> aliveSrvNodes() {
+ return aliveSrvNodes;
+ }
+
+ /**
* Gets all remote nodes that have at least one cache configured.
*
* @param topVer Topology version.
http://git-wip-us.apache.org/repos/asf/ignite/blob/4cd332b7/modules/core/src/main/java/org/apache/ignite/internal/processors/GridProcessorAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/GridProcessorAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/GridProcessorAdapter.java
index e4896fd..4b4aec5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/GridProcessorAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/GridProcessorAdapter.java
@@ -17,8 +17,6 @@
package org.apache.ignite.internal.processors;
-import java.io.Serializable;
-import java.util.UUID;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.cluster.ClusterNode;
@@ -28,6 +26,9 @@ import org.apache.ignite.internal.util.tostring.GridToStringExclude;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.lang.IgniteFuture;
import org.apache.ignite.spi.IgniteNodeValidationResult;
+import org.apache.ignite.spi.discovery.DiscoveryDataBag;
+import org.apache.ignite.spi.discovery.DiscoveryDataBag.GridDiscoveryData;
+import org.apache.ignite.spi.discovery.DiscoveryDataBag.JoiningNodeDiscoveryData;
import org.jetbrains.annotations.Nullable;
/**
@@ -79,12 +80,22 @@ public abstract class GridProcessorAdapter implements GridProcessor {
}
/** {@inheritDoc} */
- @Override @Nullable public Serializable collectDiscoveryData(UUID nodeId) {
- return null;
+ @Override public void collectJoiningNodeData(DiscoveryDataBag dataBag) {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public void collectGridNodeData(DiscoveryDataBag dataBag) {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onGridDataReceived(GridDiscoveryData data) {
+ // No-op.
}
/** {@inheritDoc} */
- @Override public void onDiscoveryDataReceived(UUID joiningNodeId, UUID rmtNodeId, Serializable data) {
+ @Override public void onJoiningNodeDataReceived(JoiningNodeDiscoveryData data) {
// No-op.
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/4cd332b7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheType.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheType.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheType.java
index 4886e61..c5855d3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheType.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheType.java
@@ -17,7 +17,6 @@
package org.apache.ignite.internal.processors.cache;
-import static org.apache.ignite.internal.managers.communication.GridIoPolicy.MARSH_CACHE_POOL;
import static org.apache.ignite.internal.managers.communication.GridIoPolicy.SYSTEM_POOL;
import static org.apache.ignite.internal.managers.communication.GridIoPolicy.UTILITY_CACHE_POOL;
@@ -38,12 +37,7 @@ public enum CacheType {
/**
* Internal replicated cache, should use separate thread pool.
*/
- UTILITY(false, UTILITY_CACHE_POOL),
-
- /**
- * Internal marshaller cache, should use separate thread pool.
- */
- MARSHALLER(false, MARSH_CACHE_POOL);
+ UTILITY(false, UTILITY_CACHE_POOL);
/** */
private final boolean userCache;
http://git-wip-us.apache.org/repos/asf/ignite/blob/4cd332b7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
index e414160..1bd7442 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
@@ -501,7 +501,6 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
@Override public final GridCacheProxyImpl<K, V> withExpiryPolicy(ExpiryPolicy plc) {
assert !CU.isUtilityCache(ctx.name());
assert !CU.isAtomicsCache(ctx.name());
- assert !CU.isMarshallerCache(ctx.name());
CacheOperationContext opCtx = new CacheOperationContext(false, null, false, plc, false, null);
http://git-wip-us.apache.org/repos/asf/ignite/blob/4cd332b7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index 9487589..c5725e7 100755
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -118,6 +118,9 @@ import org.apache.ignite.lifecycle.LifecycleAware;
import org.apache.ignite.marshaller.Marshaller;
import org.apache.ignite.marshaller.MarshallerUtils;
import org.apache.ignite.spi.IgniteNodeValidationResult;
+import org.apache.ignite.spi.discovery.DiscoveryDataBag;
+import org.apache.ignite.spi.discovery.DiscoveryDataBag.GridDiscoveryData;
+import org.apache.ignite.spi.discovery.DiscoveryDataBag.JoiningNodeDiscoveryData;
import org.jetbrains.annotations.Nullable;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_SKIP_CONFIGURATION_CONSISTENCY_CHECK;
@@ -138,6 +141,7 @@ import static org.apache.ignite.configuration.DeploymentMode.ISOLATED;
import static org.apache.ignite.configuration.DeploymentMode.PRIVATE;
import static org.apache.ignite.configuration.DeploymentMode.SHARED;
import static org.apache.ignite.events.EventType.EVT_NODE_JOINED;
+import static org.apache.ignite.internal.GridComponent.DiscoveryDataExchangeType.CACHE_PROC;
import static org.apache.ignite.internal.IgniteComponentType.JTA;
import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_CONSISTENCY_CHECK_SKIPPED;
import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_TX_CONFIG;
@@ -613,7 +617,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
ctx.config().getCacheStoreSessionListenerFactories()));
for (int i = 0; i < cfgs.length; i++) {
- if (ctx.config().isDaemon() && !CU.isMarshallerCache(cfgs[i].getName()))
+ if (ctx.config().isDaemon())
continue;
cloneCheckSerializable(cfgs[i]);
@@ -644,8 +648,6 @@ public class GridCacheProcessor extends GridProcessorAdapter {
if (CU.isUtilityCache(cfg.getName()))
cacheType = CacheType.UTILITY;
- else if (CU.isMarshallerCache(cfg.getName()))
- cacheType = CacheType.MARSHALLER;
else if (internalCaches.contains(maskNull(cfg.getName())))
cacheType = CacheType.INTERNAL;
else
@@ -768,7 +770,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
// Start dynamic caches received from collect discovery data.
for (DynamicCacheDescriptor desc : registeredCaches.values()) {
- if (ctx.config().isDaemon() && !CU.isMarshallerCache(desc.cacheConfiguration().getName()))
+ if (ctx.config().isDaemon())
continue;
desc.clearRemoteConfigurations();
@@ -818,8 +820,6 @@ public class GridCacheProcessor extends GridProcessorAdapter {
for (GridCacheAdapter<?, ?> cache : caches.values())
onKernalStart(cache);
- ctx.marshallerContext().onMarshallerCacheStarted(ctx);
-
if (!ctx.config().isDaemon())
ctx.cacheObjects().onUtilityCacheStarted();
@@ -845,7 +845,6 @@ public class GridCacheProcessor extends GridProcessorAdapter {
}
}
- assert caches.containsKey(CU.MARSH_CACHE_NAME) : "Marshaller cache should be started";
assert ctx.config().isDaemon() || caches.containsKey(CU.UTILITY_CACHE_NAME) : "Utility cache should be started";
}
@@ -985,7 +984,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
boolean stopped;
- boolean sysCache = CU.isMarshallerCache(name) || CU.isUtilityCache(name) || CU.isAtomicsCache(name);
+ boolean sysCache = CU.isUtilityCache(name) || CU.isAtomicsCache(name);
if (!sysCache) {
DynamicCacheDescriptor oldDesc = cachesOnDisconnect.get(maskNull(name));
@@ -1890,12 +1889,24 @@ public class GridCacheProcessor extends GridProcessorAdapter {
/** {@inheritDoc} */
@Nullable @Override public DiscoveryDataExchangeType discoveryDataType() {
- return DiscoveryDataExchangeType.CACHE_PROC;
+ return CACHE_PROC;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void collectJoiningNodeData(DiscoveryDataBag dataBag) {
+ dataBag.addJoiningNodeData(CACHE_PROC.ordinal(), getDiscoveryData(dataBag.joiningNodeId()));
}
/** {@inheritDoc} */
- @Nullable @Override public Serializable collectDiscoveryData(UUID nodeId) {
- boolean reconnect = ctx.localNodeId().equals(nodeId) && cachesOnDisconnect != null;
+ @Override public void collectGridNodeData(DiscoveryDataBag dataBag) {
+ dataBag.addNodeSpecificData(CACHE_PROC.ordinal(), getDiscoveryData(dataBag.joiningNodeId()));
+ }
+
+ /**
+ * @param joiningNodeId Joining node id.
+ */
+ private Serializable getDiscoveryData(UUID joiningNodeId) {
+ boolean reconnect = ctx.localNodeId().equals(joiningNodeId) && cachesOnDisconnect != null;
// Collect dynamically started caches to a single object.
Collection<DynamicCacheChangeRequest> reqs;
@@ -1907,178 +1918,225 @@ public class GridCacheProcessor extends GridProcessorAdapter {
clientNodesMap = U.newHashMap(caches.size());
- for (GridCacheAdapter<?, ?> cache : caches.values()) {
- DynamicCacheDescriptor desc = cachesOnDisconnect.get(maskNull(cache.name()));
+ collectDataOnReconnectingNode(reqs, clientNodesMap, joiningNodeId);
+ }
+ else {
+ reqs = new ArrayList<>(registeredCaches.size() + registeredTemplates.size());
- if (desc == null)
- continue;
+ clientNodesMap = ctx.discovery().clientNodesMap();
- DynamicCacheChangeRequest req = new DynamicCacheChangeRequest(cache.name(), null);
+ collectDataOnGridNode(reqs);
+ }
- req.startCacheConfiguration(desc.cacheConfiguration());
+ DynamicCacheChangeBatch batch = new DynamicCacheChangeBatch(reqs);
- req.cacheType(desc.cacheType());
+ batch.clientNodes(clientNodesMap);
- req.deploymentId(desc.deploymentId());
+ batch.clientReconnect(reconnect);
+
+ // Reset random batch ID so that serialized batches with the same descriptors will be exactly the same.
+ batch.id(null);
+
+ return batch;
+ }
- req.receivedFrom(desc.receivedFrom());
+ /**
+ * @param reqs requests.
+ */
+ private void collectDataOnGridNode(Collection<DynamicCacheChangeRequest> reqs) {
+ for (DynamicCacheDescriptor desc : registeredCaches.values()) {
+ DynamicCacheChangeRequest req = new DynamicCacheChangeRequest(desc.cacheConfiguration().getName(), null);
- reqs.add(req);
+ req.startCacheConfiguration(desc.cacheConfiguration());
- Boolean nearEnabled = cache.isNear();
+ req.cacheType(desc.cacheType());
- Map<UUID, Boolean> map = U.newHashMap(1);
+ req.deploymentId(desc.deploymentId());
- map.put(nodeId, nearEnabled);
+ req.receivedFrom(desc.receivedFrom());
- clientNodesMap.put(cache.name(), map);
- }
+ reqs.add(req);
}
- else {
- reqs = new ArrayList<>(registeredCaches.size() + registeredTemplates.size());
- for (DynamicCacheDescriptor desc : registeredCaches.values()) {
- DynamicCacheChangeRequest req = new DynamicCacheChangeRequest(desc.cacheConfiguration().getName(), null);
+ for (DynamicCacheDescriptor desc : registeredTemplates.values()) {
+ DynamicCacheChangeRequest req = new DynamicCacheChangeRequest(desc.cacheConfiguration().getName(), null);
- req.startCacheConfiguration(desc.cacheConfiguration());
+ req.startCacheConfiguration(desc.cacheConfiguration());
- req.cacheType(desc.cacheType());
+ req.template(true);
- req.deploymentId(desc.deploymentId());
+ reqs.add(req);
+ }
+ }
- req.receivedFrom(desc.receivedFrom());
+ /**
+ * @param reqs requests.
+ * @param clientNodesMap Client nodes map.
+ * @param nodeId Node id.
+ */
+ private void collectDataOnReconnectingNode(
+ Collection<DynamicCacheChangeRequest> reqs,
+ Map<String, Map<UUID, Boolean>> clientNodesMap,
+ UUID nodeId
+ ) {
+ for (GridCacheAdapter<?, ?> cache : caches.values()) {
+ DynamicCacheDescriptor desc = cachesOnDisconnect.get(maskNull(cache.name()));
- reqs.add(req);
- }
+ if (desc == null)
+ continue;
- for (DynamicCacheDescriptor desc : registeredTemplates.values()) {
- DynamicCacheChangeRequest req = new DynamicCacheChangeRequest(desc.cacheConfiguration().getName(), null);
+ DynamicCacheChangeRequest req = new DynamicCacheChangeRequest(cache.name(), null);
- req.startCacheConfiguration(desc.cacheConfiguration());
+ req.startCacheConfiguration(desc.cacheConfiguration());
- req.template(true);
+ req.cacheType(desc.cacheType());
- reqs.add(req);
- }
+ req.deploymentId(desc.deploymentId());
- clientNodesMap = ctx.discovery().clientNodesMap();
- }
+ req.receivedFrom(desc.receivedFrom());
- DynamicCacheChangeBatch batch = new DynamicCacheChangeBatch(reqs);
+ reqs.add(req);
- batch.clientNodes(clientNodesMap);
+ Boolean nearEnabled = cache.isNear();
- batch.clientReconnect(reconnect);
+ Map<UUID, Boolean> map = U.newHashMap(1);
- // Reset random batch ID so that serialized batches with the same descriptors will be exactly the same.
- batch.id(null);
+ map.put(nodeId, nearEnabled);
- return batch;
+ clientNodesMap.put(cache.name(), map);
+ }
}
/** {@inheritDoc} */
- @Override public void onDiscoveryDataReceived(UUID joiningNodeId, UUID rmtNodeId, Serializable data) {
- if (data instanceof DynamicCacheChangeBatch) {
- DynamicCacheChangeBatch batch = (DynamicCacheChangeBatch)data;
+ @Override public void onJoiningNodeDataReceived(JoiningNodeDiscoveryData data) {
+ if (data.hasJoiningNodeData()) {
+ Serializable joiningNodeData = data.joiningNodeData();
+ if (joiningNodeData instanceof DynamicCacheChangeBatch)
+ onDiscoDataReceived(
+ data.joiningNodeId(),
+ data.joiningNodeId(),
+ (DynamicCacheChangeBatch) joiningNodeData);
+ }
+ }
- if (batch.clientReconnect()) {
- if (ctx.clientDisconnected()) {
- if (clientReconnectReqs == null)
- clientReconnectReqs = new LinkedHashMap<>();
+ /** {@inheritDoc} */
+ @Override public void onGridDataReceived(GridDiscoveryData data) {
+ Map<UUID, Serializable> nodeSpecData = data.nodeSpecificData();
- clientReconnectReqs.put(joiningNodeId, batch);
+ if (nodeSpecData != null) {
+ for (Map.Entry<UUID, Serializable> e : nodeSpecData.entrySet()) {
+ if (e.getValue() != null && e.getValue() instanceof DynamicCacheChangeBatch) {
+ DynamicCacheChangeBatch batch = (DynamicCacheChangeBatch) e.getValue();
- return;
+ onDiscoDataReceived(data.joiningNodeId(), e.getKey(), batch);
}
+ }
+ }
+ }
- processClientReconnectData(joiningNodeId, batch);
+ /**
+ * @param joiningNodeId Joining node id.
+ * @param rmtNodeId Rmt node id.
+ * @param batch Batch.
+ */
+ private void onDiscoDataReceived(UUID joiningNodeId, UUID rmtNodeId, DynamicCacheChangeBatch batch) {
+ if (batch.clientReconnect()) {
+ if (ctx.clientDisconnected()) {
+ if (clientReconnectReqs == null)
+ clientReconnectReqs = new LinkedHashMap<>();
+
+ clientReconnectReqs.put(joiningNodeId, batch);
+
+ return;
}
- else {
- for (DynamicCacheChangeRequest req : batch.requests()) {
- initReceivedCacheConfiguration(req);
- if (req.template()) {
- CacheConfiguration ccfg = req.startCacheConfiguration();
+ processClientReconnectData(joiningNodeId, batch);
+ }
+ else {
+ for (DynamicCacheChangeRequest req : batch.requests()) {
+ initReceivedCacheConfiguration(req);
+
+ if (req.template()) {
+ CacheConfiguration ccfg = req.startCacheConfiguration();
- assert ccfg != null : req;
+ assert ccfg != null : req;
- DynamicCacheDescriptor existing = registeredTemplates.get(maskNull(req.cacheName()));
+ DynamicCacheDescriptor existing = registeredTemplates.get(maskNull(req.cacheName()));
- if (existing == null) {
- DynamicCacheDescriptor desc = new DynamicCacheDescriptor(
+ if (existing == null) {
+ DynamicCacheDescriptor desc = new DynamicCacheDescriptor(
ctx,
ccfg,
req.cacheType(),
true,
req.deploymentId());
- registeredTemplates.put(maskNull(req.cacheName()), desc);
- }
-
- continue;
+ registeredTemplates.put(maskNull(req.cacheName()), desc);
}
- DynamicCacheDescriptor existing = registeredCaches.get(maskNull(req.cacheName()));
+ continue;
+ }
- if (req.start() && !req.clientStartOnly()) {
- CacheConfiguration ccfg = req.startCacheConfiguration();
+ DynamicCacheDescriptor existing = registeredCaches.get(maskNull(req.cacheName()));
- if (existing != null) {
- if (joiningNodeId.equals(ctx.localNodeId())) {
- existing.receivedFrom(req.receivedFrom());
+ if (req.start() && !req.clientStartOnly()) {
+ CacheConfiguration ccfg = req.startCacheConfiguration();
- existing.deploymentId(req.deploymentId());
- }
+ if (existing != null) {
+ if (joiningNodeId.equals(ctx.localNodeId())) {
+ existing.receivedFrom(req.receivedFrom());
- if (existing.locallyConfigured()) {
- existing.addRemoteConfiguration(rmtNodeId, req.startCacheConfiguration());
+ existing.deploymentId(req.deploymentId());
+ }
- ctx.discovery().setCacheFilter(
+ if (existing.locallyConfigured()) {
+ existing.addRemoteConfiguration(rmtNodeId, req.startCacheConfiguration());
+
+ ctx.discovery().setCacheFilter(
req.cacheName(),
ccfg.getNodeFilter(),
ccfg.getNearConfiguration() != null,
ccfg.getCacheMode());
- }
}
- else {
- assert req.cacheType() != null : req;
+ }
+ else {
+ assert req.cacheType() != null : req;
- DynamicCacheDescriptor desc = new DynamicCacheDescriptor(
+ DynamicCacheDescriptor desc = new DynamicCacheDescriptor(
ctx,
ccfg,
req.cacheType(),
false,
req.deploymentId());
- // Received statically configured cache.
- if (req.initiatingNodeId() == null)
- desc.staticallyConfigured(true);
+ // Received statically configured cache.
+ if (req.initiatingNodeId() == null)
+ desc.staticallyConfigured(true);
- if (joiningNodeId.equals(ctx.localNodeId()))
- desc.receivedOnDiscovery(true);
+ if (joiningNodeId.equals(ctx.localNodeId()))
+ desc.receivedOnDiscovery(true);
- desc.receivedFrom(req.receivedFrom());
+ desc.receivedFrom(req.receivedFrom());
- DynamicCacheDescriptor old = registeredCaches.put(maskNull(req.cacheName()), desc);
+ DynamicCacheDescriptor old = registeredCaches.put(maskNull(req.cacheName()), desc);
- assert old == null : old;
+ assert old == null : old;
- ctx.discovery().setCacheFilter(
+ ctx.discovery().setCacheFilter(
req.cacheName(),
ccfg.getNodeFilter(),
ccfg.getNearConfiguration() != null,
ccfg.getCacheMode());
- }
}
}
+ }
- if (!F.isEmpty(batch.clientNodes())) {
- for (Map.Entry<String, Map<UUID, Boolean>> entry : batch.clientNodes().entrySet()) {
- String cacheName = entry.getKey();
+ if (!F.isEmpty(batch.clientNodes())) {
+ for (Map.Entry<String, Map<UUID, Boolean>> entry : batch.clientNodes().entrySet()) {
+ String cacheName = entry.getKey();
- for (Map.Entry<UUID, Boolean> tup : entry.getValue().entrySet())
- ctx.discovery().addClientNode(cacheName, tup.getKey(), tup.getValue());
- }
+ for (Map.Entry<UUID, Boolean> tup : entry.getValue().entrySet())
+ ctx.discovery().addClientNode(cacheName, tup.getKey(), tup.getValue());
}
}
}
@@ -2098,7 +2156,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
String name = req.cacheName();
- boolean sysCache = CU.isMarshallerCache(name) || CU.isUtilityCache(name) || CU.isAtomicsCache(name);
+ boolean sysCache = CU.isUtilityCache(name) || CU.isAtomicsCache(name);
if (!sysCache) {
DynamicCacheDescriptor desc = registeredCaches.get(maskNull(req.cacheName()));
@@ -3127,13 +3185,6 @@ public class GridCacheProcessor extends GridProcessorAdapter {
}
/**
- * @return Marshaller system cache.
- */
- public GridCacheAdapter<Integer, String> marshallerCache() {
- return internalCache(CU.MARSH_CACHE_NAME);
- }
-
- /**
* Gets utility cache.
*
* @return Utility cache.
http://git-wip-us.apache.org/repos/asf/ignite/blob/4cd332b7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheTtlManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheTtlManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheTtlManager.java
index 0f855fe..e694e30 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheTtlManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheTtlManager.java
@@ -43,7 +43,6 @@ public class GridCacheTtlManager extends GridCacheManagerAdapter {
boolean cleanupDisabled = cctx.kernalContext().isDaemon() ||
!cctx.config().isEagerTtl() ||
CU.isAtomicsCache(cctx.name()) ||
- CU.isMarshallerCache(cctx.name()) ||
CU.isUtilityCache(cctx.name()) ||
(cctx.kernalContext().clientNode() && cctx.config().getNearConfiguration() == null);
http://git-wip-us.apache.org/repos/asf/ignite/blob/4cd332b7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
index 969c41a..61a57f9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
@@ -154,9 +154,6 @@ public class GridCacheUtils {
/** Atomics system cache name. */
public static final String ATOMICS_CACHE_NAME = "ignite-atomics-sys-cache";
- /** Marshaller system cache name. */
- public static final String MARSH_CACHE_NAME = "ignite-marshaller-sys-cache";
-
/** */
public static final String CACHE_MSG_LOG_CATEGORY = "org.apache.ignite.cache.msg";
@@ -1176,14 +1173,6 @@ public class GridCacheUtils {
/**
* @param cacheName Cache name.
- * @return {@code True} if this is marshaller system cache.
- */
- public static boolean isMarshallerCache(String cacheName) {
- return MARSH_CACHE_NAME.equals(cacheName);
- }
-
- /**
- * @param cacheName Cache name.
* @return {@code True} if this is utility system cache.
*/
public static boolean isUtilityCache(String cacheName) {
@@ -1203,7 +1192,7 @@ public class GridCacheUtils {
* @return {@code True} if system cache.
*/
public static boolean isSystemCache(String cacheName) {
- return isMarshallerCache(cacheName) || isUtilityCache(cacheName) || isHadoopSystemCache(cacheName) ||
+ return isUtilityCache(cacheName) || isHadoopSystemCache(cacheName) ||
isAtomicsCache(cacheName);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/4cd332b7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFutureAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFutureAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFutureAdapter.java
index a00cf3e..976f05f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFutureAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFutureAdapter.java
@@ -62,6 +62,19 @@ public abstract class GridNearOptimisticTxPrepareFutureAdapter extends GridNearT
}
if (topVer != null) {
+ try {
+ IgniteCheckedException err = tx.txState().validateTopology(cctx, topologyReadLock());
+
+ if (err != null) {
+ onDone(err);
+
+ return;
+ }
+ }
+ finally {
+ topologyReadUnlock();
+ }
+
tx.topologyVersion(topVer);
cctx.mvcc().addFuture(this);
http://git-wip-us.apache.org/repos/asf/ignite/blob/4cd332b7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
index 85c01d9..a0ab0be 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
@@ -2367,7 +2367,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
},
new P1<IgniteInternalCache<?, ?>>() {
@Override public boolean apply(IgniteInternalCache<?, ?> c) {
- return !CU.MARSH_CACHE_NAME.equals(c.name()) && !CU.UTILITY_CACHE_NAME.equals(c.name()) &&
+ return !CU.UTILITY_CACHE_NAME.equals(c.name()) &&
!CU.ATOMICS_CACHE_NAME.equals(c.name());
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/4cd332b7/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterProcessor.java
index 6500cf3..d1c8b2d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterProcessor.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.cluster;
import java.io.Serializable;
import java.lang.ref.WeakReference;
+import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.Timer;
@@ -36,9 +37,12 @@ import org.apache.ignite.internal.util.GridTimerTask;
import org.apache.ignite.internal.util.future.IgniteFinishedFutureImpl;
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
import org.apache.ignite.lang.IgniteFuture;
+import org.apache.ignite.spi.discovery.DiscoveryDataBag;
+import org.apache.ignite.spi.discovery.DiscoveryDataBag.GridDiscoveryData;
import org.jetbrains.annotations.Nullable;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_UPDATE_NOTIFIER;
+import static org.apache.ignite.internal.GridComponent.DiscoveryDataExchangeType.CLUSTER_PROC;
import static org.apache.ignite.internal.IgniteVersionUtils.VER_STR;
/**
@@ -98,11 +102,24 @@ public class ClusterProcessor extends GridProcessorAdapter {
/** {@inheritDoc} */
@Nullable @Override public DiscoveryDataExchangeType discoveryDataType() {
- return DiscoveryDataExchangeType.CLUSTER_PROC;
+ return CLUSTER_PROC;
}
/** {@inheritDoc} */
- @Nullable @Override public Serializable collectDiscoveryData(UUID nodeId) {
+ @Override public void collectJoiningNodeData(DiscoveryDataBag dataBag) {
+ dataBag.addJoiningNodeData(CLUSTER_PROC.ordinal(), getDiscoveryData());
+ }
+
+ /** {@inheritDoc} */
+ @Override public void collectGridNodeData(DiscoveryDataBag dataBag) {
+ dataBag.addNodeSpecificData(CLUSTER_PROC.ordinal(), getDiscoveryData());
+ }
+
+
+ /**
+ * @return Discovery data.
+ */
+ private Serializable getDiscoveryData() {
HashMap<String, Object> map = new HashMap<>();
map.put(ATTR_UPDATE_NOTIFIER_STATUS, notifyEnabled.get());
@@ -111,16 +128,36 @@ public class ClusterProcessor extends GridProcessorAdapter {
}
/** {@inheritDoc} */
- @SuppressWarnings("unchecked")
- @Override public void onDiscoveryDataReceived(UUID joiningNodeId, UUID rmtNodeId, Serializable data) {
- if (joiningNodeId.equals(ctx.localNodeId())) {
- Map<String, Object> map = (Map<String, Object>)data;
+ @Override public void onGridDataReceived(GridDiscoveryData data) {
+ Map<UUID, Serializable> nodeSpecData = data.nodeSpecificData();
- if (map != null && map.containsKey(ATTR_UPDATE_NOTIFIER_STATUS))
- notifyEnabled.set((Boolean)map.get(ATTR_UPDATE_NOTIFIER_STATUS));
+ if (nodeSpecData != null) {
+ Boolean lstFlag = findLastFlag(nodeSpecData.values());
+
+ if (lstFlag != null)
+ notifyEnabled.set(lstFlag);
}
}
+
+ /**
+ * @param vals collection to seek through.
+ */
+ private Boolean findLastFlag(Collection<Serializable> vals) {
+ Boolean flag = null;
+
+ for (Serializable ser : vals) {
+ if (ser != null) {
+ Map<String, Object> map = (Map<String, Object>) ser;
+
+ if (map.containsKey(ATTR_UPDATE_NOTIFIER_STATUS))
+ flag = (Boolean) map.get(ATTR_UPDATE_NOTIFIER_STATUS);
+ }
+ }
+
+ return flag;
+ }
+
/** {@inheritDoc} */
@Override public void onKernalStart() throws IgniteCheckedException {
if (notifyEnabled.get()) {
http://git-wip-us.apache.org/repos/asf/ignite/blob/4cd332b7/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
index 9fd9b6d..b0db510 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
@@ -78,6 +78,9 @@ import org.apache.ignite.lang.IgniteProductVersion;
import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.marshaller.Marshaller;
import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.spi.discovery.DiscoveryDataBag;
+import org.apache.ignite.spi.discovery.DiscoveryDataBag.GridDiscoveryData;
+import org.apache.ignite.spi.discovery.DiscoveryDataBag.JoiningNodeDiscoveryData;
import org.apache.ignite.thread.IgniteThread;
import org.jetbrains.annotations.Nullable;
import org.jsr166.ConcurrentHashMap8;
@@ -85,6 +88,7 @@ import org.jsr166.ConcurrentHashMap8;
import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
import static org.apache.ignite.events.EventType.EVT_NODE_SEGMENTED;
+import static org.apache.ignite.internal.GridComponent.DiscoveryDataExchangeType.CONTINUOUS_PROC;
import static org.apache.ignite.internal.GridTopic.TOPIC_CACHE;
import static org.apache.ignite.internal.GridTopic.TOPIC_CONTINUOUS;
import static org.apache.ignite.internal.managers.communication.GridIoPolicy.SYSTEM_POOL;
@@ -319,8 +323,6 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
}
});
- ctx.marshallerContext().onContinuousProcessorStarted(ctx);
-
ctx.cacheObjects().onContinuousProcessorStarted(ctx);
ctx.service().onContinuousProcessorStarted(ctx);
@@ -403,36 +405,42 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
/** {@inheritDoc} */
@Nullable @Override public DiscoveryDataExchangeType discoveryDataType() {
- return DiscoveryDataExchangeType.CONTINUOUS_PROC;
+ return CONTINUOUS_PROC;
}
/** {@inheritDoc} */
- @Override @Nullable public Serializable collectDiscoveryData(UUID nodeId) {
- if (log.isDebugEnabled()) {
- log.debug("collectDiscoveryData [node=" + nodeId +
- ", loc=" + ctx.localNodeId() +
- ", locInfos=" + locInfos +
- ", clientInfos=" + clientInfos +
- ']');
- }
+ @Override public void collectJoiningNodeData(DiscoveryDataBag dataBag) {
+ Serializable data = getDiscoveryData(dataBag.joiningNodeId());
- if (!nodeId.equals(ctx.localNodeId()) || !locInfos.isEmpty()) {
- Map<UUID, Map<UUID, LocalRoutineInfo>> clientInfos0 = U.newHashMap(clientInfos.size());
+ if (data != null)
+ dataBag.addJoiningNodeData(CONTINUOUS_PROC.ordinal(), data);
+ }
- for (Map.Entry<UUID, Map<UUID, LocalRoutineInfo>> e : clientInfos.entrySet()) {
- Map<UUID, LocalRoutineInfo> cp = U.newHashMap(e.getValue().size());
+ /** {@inheritDoc} */
+ @Override public void collectGridNodeData(DiscoveryDataBag dataBag) {
+ Serializable data = getDiscoveryData(dataBag.joiningNodeId());
- for (Map.Entry<UUID, LocalRoutineInfo> e0 : e.getValue().entrySet())
- cp.put(e0.getKey(), e0.getValue());
+ if (data != null)
+ dataBag.addNodeSpecificData(CONTINUOUS_PROC.ordinal(), data);
+ }
- clientInfos0.put(e.getKey(), cp);
- }
+ /**
+ * @param joiningNodeId Joining node id.
+ */
+ private Serializable getDiscoveryData(UUID joiningNodeId) {
+ if (log.isDebugEnabled()) {
+ log.debug("collectDiscoveryData [node=" + joiningNodeId +
+ ", loc=" + ctx.localNodeId() +
+ ", locInfos=" + locInfos +
+ ", clientInfos=" + clientInfos +
+ ']');
+ }
- if (nodeId.equals(ctx.localNodeId()) && ctx.discovery().localNode().isClient()) {
- Map<UUID, LocalRoutineInfo> infos = new HashMap<>();
+ if (!joiningNodeId.equals(ctx.localNodeId()) || !locInfos.isEmpty()) {
+ Map<UUID, Map<UUID, LocalRoutineInfo>> clientInfos0 = copyClientInfos(clientInfos);
- for (Map.Entry<UUID, LocalRoutineInfo> e : locInfos.entrySet())
- infos.put(e.getKey(), e.getValue());
+ if (joiningNodeId.equals(ctx.localNodeId()) && ctx.discovery().localNode().isClient()) {
+ Map<UUID, LocalRoutineInfo> infos = copyLocalInfos(locInfos);
clientInfos0.put(ctx.localNodeId(), infos);
}
@@ -445,31 +453,75 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
LocalRoutineInfo info = e.getValue();
data.addItem(new DiscoveryDataItem(routineId,
- info.prjPred,
- info.hnd,
- info.bufSize,
- info.interval,
- info.autoUnsubscribe));
+ info.prjPred,
+ info.hnd,
+ info.bufSize,
+ info.interval,
+ info.autoUnsubscribe));
}
return data;
}
-
return null;
}
- /** {@inheritDoc} */
- @Override public void onDiscoveryDataReceived(UUID joiningNodeId, UUID rmtNodeId, Serializable obj) {
- DiscoveryData data = (DiscoveryData)obj;
+ /**
+ * @param clientInfos Client infos.
+ */
+ private Map<UUID, Map<UUID, LocalRoutineInfo>> copyClientInfos(Map<UUID, Map<UUID, LocalRoutineInfo>> clientInfos) {
+ Map<UUID, Map<UUID, LocalRoutineInfo>> res = U.newHashMap(clientInfos.size());
+
+ for (Map.Entry<UUID, Map<UUID, LocalRoutineInfo>> e : clientInfos.entrySet()) {
+ Map<UUID, LocalRoutineInfo> cp = U.newHashMap(e.getValue().size());
+
+ for (Map.Entry<UUID, LocalRoutineInfo> e0 : e.getValue().entrySet())
+ cp.put(e0.getKey(), e0.getValue());
+
+ res.put(e.getKey(), cp);
+ }
+ return res;
+ }
+
+ /**
+ * @param locInfos Locale infos.
+ */
+ private Map<UUID, LocalRoutineInfo> copyLocalInfos(Map<UUID, LocalRoutineInfo> locInfos) {
+ Map<UUID, LocalRoutineInfo> res = U.newHashMap(locInfos.size());
+
+ for (Map.Entry<UUID, LocalRoutineInfo> e : locInfos.entrySet())
+ res.put(e.getKey(), e.getValue());
+
+ return res;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onJoiningNodeDataReceived(JoiningNodeDiscoveryData data) {
if (log.isDebugEnabled()) {
- log.info("onDiscoveryDataReceived [joining=" + joiningNodeId +
- ", rmtNodeId=" + rmtNodeId +
- ", loc=" + ctx.localNodeId() +
- ", data=" + data +
- ']');
+ log.info("onJoiningNodeDataReceived [joining=" + data.joiningNodeId() +
+ ", loc=" + ctx.localNodeId() +
+ ", data=" + data.joiningNodeData() +
+ ']');
}
+ if (data.hasJoiningNodeData())
+ onDiscoDataReceived((DiscoveryData) data.joiningNodeData());
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onGridDataReceived(GridDiscoveryData data) {
+ Map<UUID, Serializable> nodeSpecData = data.nodeSpecificData();
+
+ if (nodeSpecData != null) {
+ for (Map.Entry<UUID, Serializable> e : nodeSpecData.entrySet())
+ onDiscoDataReceived((DiscoveryData) e.getValue());
+ }
+ }
+
+ /**
+ * @param data received discovery data.
+ */
+ private void onDiscoDataReceived(DiscoveryData data) {
if (!ctx.isDaemon() && data != null) {
for (DiscoveryDataItem item : data.items) {
try {
@@ -478,14 +530,14 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
// Register handler only if local node passes projection predicate.
if ((item.prjPred == null || item.prjPred.apply(ctx.discovery().localNode())) &&
- !locInfos.containsKey(item.routineId))
+ !locInfos.containsKey(item.routineId))
registerHandler(data.nodeId, item.routineId, item.hnd, item.bufSize, item.interval,
- item.autoUnsubscribe, false);
+ item.autoUnsubscribe, false);
if (!item.autoUnsubscribe)
// Register routine locally.
locInfos.putIfAbsent(item.routineId, new LocalRoutineInfo(
- item.prjPred, item.hnd, item.bufSize, item.interval, item.autoUnsubscribe));
+ item.prjPred, item.hnd, item.bufSize, item.interval, item.autoUnsubscribe));
}
catch (IgniteCheckedException e) {
U.error(log, "Failed to register continuous handler.", e);
@@ -508,12 +560,12 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
if (info.prjPred == null || info.prjPred.apply(ctx.discovery().localNode())) {
registerHandler(clientNodeId,
- routineId,
- info.hnd,
- info.bufSize,
- info.interval,
- info.autoUnsubscribe,
- false);
+ routineId,
+ info.hnd,
+ info.bufSize,
+ info.interval,
+ info.autoUnsubscribe,
+ false);
}
}
catch (IgniteCheckedException err) {
http://git-wip-us.apache.org/repos/asf/ignite/blob/4cd332b7/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/ClientRequestFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/ClientRequestFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/ClientRequestFuture.java
new file mode 100644
index 0000000..a361760
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/ClientRequestFuture.java
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.marshaller;
+
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.Queue;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.GridTopic;
+import org.apache.ignite.internal.managers.communication.GridIoManager;
+import org.apache.ignite.internal.managers.communication.GridIoPolicy;
+import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Future responsible for requesting missing marshaller mapping from one of available server nodes.
+ *
+ * Handles scenarios when server nodes leave cluster. If node that was requested for mapping leaves the cluster or fails,
+ * mapping is automatically requested from the next node available in topology.
+ */
+final class ClientRequestFuture extends GridFutureAdapter<MappingExchangeResult> {
+ /** */
+ private static final AtomicReference<IgniteLogger> logRef = new AtomicReference<>();
+
+ /** */
+ private static IgniteLogger log;
+
+ /** */
+ private final GridIoManager ioMgr;
+
+ /** */
+ private final GridDiscoveryManager discoMgr;
+
+ /** */
+ private final MarshallerMappingItem item;
+
+ /** */
+ private final Map<MarshallerMappingItem, ClientRequestFuture> syncMap;
+
+ /** */
+ private final Queue<ClusterNode> aliveSrvNodes;
+
+ /** */
+ private ClusterNode pendingNode;
+
+ /**
+ * @param ctx Context.
+ * @param item Item.
+ * @param syncMap Sync map.
+ */
+ ClientRequestFuture(
+ GridKernalContext ctx,
+ MarshallerMappingItem item,
+ Map<MarshallerMappingItem, ClientRequestFuture> syncMap
+ ) {
+ ioMgr = ctx.io();
+ discoMgr = ctx.discovery();
+ aliveSrvNodes = new LinkedList<>(discoMgr.aliveSrvNodes());
+ this.item = item;
+ this.syncMap = syncMap;
+
+ if (log == null)
+ log = U.logger(ctx, logRef, ClientRequestFuture.class);
+ }
+
+ /**
+ *
+ */
+ void requestMapping() {
+ boolean noSrvsInCluster;
+
+ synchronized (this) {
+ while (!aliveSrvNodes.isEmpty()) {
+ ClusterNode srvNode = aliveSrvNodes.poll();
+
+ try {
+ ioMgr.send(
+ srvNode,
+ GridTopic.TOPIC_MAPPING_MARSH,
+ new MissingMappingRequestMessage(
+ item.platformId(),
+ item.typeId()),
+ GridIoPolicy.SYSTEM_POOL);
+
+ if (discoMgr.node(srvNode.id()) == null)
+ continue;
+
+ pendingNode = srvNode;
+
+ break;
+ }
+ catch (IgniteCheckedException ignored) {
+ U.warn(log,
+ "Failed to request marshaller mapping from remote node (proceeding with the next one): "
+ + srvNode);
+ }
+ }
+
+ noSrvsInCluster = pendingNode == null;
+ }
+
+ if (noSrvsInCluster)
+ onDone(MappingExchangeResult.createFailureResult(
+ new IgniteCheckedException(
+ "All server nodes have left grid, cannot request mapping [platformId: "
+ + item.platformId()
+ + "; typeId: "
+ + item.typeId() + "]")));
+ }
+
+ /**
+ * @param nodeId Node ID.
+ * @param res Mapping Request Result.
+ */
+ void onResponse(UUID nodeId, MappingExchangeResult res) {
+ MappingExchangeResult res0 = null;
+
+ synchronized (this) {
+ if (pendingNode != null && pendingNode.id().equals(nodeId))
+ res0 = res;
+ }
+
+ if (res0 != null)
+ onDone(res0);
+ }
+
+ /**
+ * If left node is actually the one latest mapping request was sent to,
+ * request is sent again to the next node in topology.
+ *
+ * @param leftNodeId Left node id.
+ */
+ void onNodeLeft(UUID leftNodeId) {
+ boolean reqAgain = false;
+
+ synchronized (this) {
+ if (pendingNode != null && pendingNode.id().equals(leftNodeId)) {
+ aliveSrvNodes.remove(pendingNode);
+
+ pendingNode = null;
+
+ reqAgain = true;
+ }
+ }
+
+ if (reqAgain)
+ requestMapping();
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean onDone(@Nullable MappingExchangeResult res, @Nullable Throwable err) {
+ assert res != null;
+
+ boolean done = super.onDone(res, err);
+
+ if (done)
+ syncMap.remove(item);
+
+ return done;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/4cd332b7/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/GridMarshallerMappingProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/GridMarshallerMappingProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/GridMarshallerMappingProcessor.java
new file mode 100644
index 0000000..7356e6c
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/GridMarshallerMappingProcessor.java
@@ -0,0 +1,326 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.marshaller;
+
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentMap;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.events.DiscoveryEvent;
+import org.apache.ignite.events.Event;
+import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.IgniteClientDisconnectedCheckedException;
+import org.apache.ignite.internal.MarshallerContextImpl;
+import org.apache.ignite.internal.managers.communication.GridIoManager;
+import org.apache.ignite.internal.managers.communication.GridMessageListener;
+import org.apache.ignite.internal.managers.discovery.CustomEventListener;
+import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
+import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager;
+import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
+import org.apache.ignite.internal.processors.GridProcessorAdapter;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteFuture;
+import org.apache.ignite.spi.discovery.DiscoveryDataBag;
+import org.apache.ignite.spi.discovery.DiscoveryDataBag.GridDiscoveryData;
+import org.jetbrains.annotations.Nullable;
+import org.jsr166.ConcurrentHashMap8;
+
+import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
+import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
+import static org.apache.ignite.internal.GridComponent.DiscoveryDataExchangeType.MARSHALLER_PROC;
+import static org.apache.ignite.internal.GridTopic.TOPIC_MAPPING_MARSH;
+import static org.apache.ignite.internal.managers.communication.GridIoPolicy.SYSTEM_POOL;
+
+/**
+ * Processor responsible for managing custom {@link DiscoveryCustomMessage}
+ * events for exchanging marshalling mappings between nodes in grid.
+ *
+ * In particular it processes two flows:
+ * <ul>
+ * <li>
+ * Some node, server or client, wants to add new mapping for some class.
+ * In that case a pair of {@link MappingProposedMessage} and {@link MappingAcceptedMessage} events is used.
+ * </li>
+ * <li>
+ * As discovery events are delivered to clients asynchronously,
+ * client node may not have some mapping when server nodes in the grid are already allowed to use the mapping.
+ * In that situation client sends a {@link MissingMappingRequestMessage} request
+ * and processor handles it as well as {@link MissingMappingResponseMessage} message.
+ * </li>
+ * </ul>
+ */
+public class GridMarshallerMappingProcessor extends GridProcessorAdapter {
+ /** */
+ private final MarshallerContextImpl marshallerCtx;
+
+ /** */
+ private final ConcurrentMap<MarshallerMappingItem, GridFutureAdapter<MappingExchangeResult>> mappingExchangeSyncMap
+ = new ConcurrentHashMap8<>();
+
+ /** */
+ private final ConcurrentMap<MarshallerMappingItem, ClientRequestFuture> clientReqSyncMap = new ConcurrentHashMap8<>();
+
+ /**
+ * @param ctx Kernal context.
+ */
+ public GridMarshallerMappingProcessor(GridKernalContext ctx) {
+ super(ctx);
+
+ marshallerCtx = ctx.marshallerContext();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void start() throws IgniteCheckedException {
+ GridDiscoveryManager discoMgr = ctx.discovery();
+ GridIoManager ioMgr = ctx.io();
+
+ MarshallerMappingTransport transport = new MarshallerMappingTransport(
+ ctx,
+ mappingExchangeSyncMap,
+ clientReqSyncMap
+ );
+ marshallerCtx.onMarshallerProcessorStarted(ctx, transport);
+
+ discoMgr.setCustomEventListener(MappingProposedMessage.class, new MarshallerMappingExchangeListener());
+
+ discoMgr.setCustomEventListener(MappingAcceptedMessage.class, new MappingAcceptedListener());
+
+ if (!ctx.clientNode())
+ ioMgr.addMessageListener(TOPIC_MAPPING_MARSH, new MissingMappingRequestListener(ioMgr));
+ else
+ ioMgr.addMessageListener(TOPIC_MAPPING_MARSH, new MissingMappingResponseListener());
+
+ if (ctx.clientNode())
+ ctx.event().addLocalEventListener(new GridLocalEventListener() {
+ @Override public void onEvent(Event evt) {
+ DiscoveryEvent evt0 = (DiscoveryEvent) evt;
+
+ if (!ctx.isStopping()) {
+ for (ClientRequestFuture fut : clientReqSyncMap.values())
+ fut.onNodeLeft(evt0.eventNode().id());
+ }
+ }
+ }, EVT_NODE_LEFT, EVT_NODE_FAILED);
+ }
+
+ /**
+ *
+ */
+ private final class MissingMappingRequestListener implements GridMessageListener {
+ /** */
+ private final GridIoManager ioMgr;
+
+ /**
+ * @param ioMgr Io manager.
+ */
+ MissingMappingRequestListener(GridIoManager ioMgr) {
+ this.ioMgr = ioMgr;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onMessage(UUID nodeId, Object msg) {
+ assert msg instanceof MissingMappingRequestMessage : msg;
+
+ MissingMappingRequestMessage msg0 = (MissingMappingRequestMessage) msg;
+
+ byte platformId = msg0.platformId();
+ int typeId = msg0.typeId();
+
+ String resolvedClsName = marshallerCtx.resolveMissedMapping(platformId, typeId);
+
+ try {
+ ioMgr.send(
+ nodeId,
+ TOPIC_MAPPING_MARSH,
+ new MissingMappingResponseMessage(platformId, typeId, resolvedClsName),
+ SYSTEM_POOL);
+ }
+ catch (IgniteCheckedException e) {
+ U.error(log, "Failed to send missing mapping response.", e);
+ }
+ }
+ }
+
+ /**
+ *
+ */
+ private final class MissingMappingResponseListener implements GridMessageListener {
+ /** {@inheritDoc} */
+ @Override public void onMessage(UUID nodeId, Object msg) {
+ assert msg instanceof MissingMappingResponseMessage : msg;
+
+ MissingMappingResponseMessage msg0 = (MissingMappingResponseMessage) msg;
+
+ byte platformId = msg0.platformId();
+ int typeId = msg0.typeId();
+ String resolvedClsName = msg0.className();
+
+ MarshallerMappingItem item = new MarshallerMappingItem(platformId, typeId, null);
+
+ GridFutureAdapter<MappingExchangeResult> fut = clientReqSyncMap.get(item);
+
+ if (fut != null) {
+ if (resolvedClsName != null) {
+ marshallerCtx.onMissedMappingResolved(item, resolvedClsName);
+
+ fut.onDone(MappingExchangeResult.createSuccessfulResult(resolvedClsName));
+ }
+ else
+ fut.onDone(MappingExchangeResult.createFailureResult(
+ new IgniteCheckedException(
+ "Failed to resolve mapping [platformId: "
+ + platformId
+ + ", typeId: "
+ + typeId + "]")));
+ }
+ }
+ }
+
+ /**
+ *
+ */
+ private final class MarshallerMappingExchangeListener implements CustomEventListener<MappingProposedMessage> {
+ /** {@inheritDoc} */
+ @Override public void onCustomEvent(
+ AffinityTopologyVersion topVer,
+ ClusterNode snd,
+ MappingProposedMessage msg
+ ) {
+ if (!ctx.isStopping()) {
+ if (msg.duplicated())
+ return;
+
+ if (!msg.inConflict()) {
+ MarshallerMappingItem item = msg.mappingItem();
+ String conflictingName = marshallerCtx.onMappingProposed(item);
+
+ if (conflictingName != null) {
+ if (conflictingName.equals(item.className()))
+ msg.markDuplicated();
+ else
+ msg.conflictingWithClass(conflictingName);
+ }
+ }
+ else {
+ UUID origNodeId = msg.origNodeId();
+
+ if (origNodeId.equals(ctx.localNodeId())) {
+ GridFutureAdapter<MappingExchangeResult> fut = mappingExchangeSyncMap.get(msg.mappingItem());
+
+ assert fut != null: msg;
+
+ fut.onDone(MappingExchangeResult.createFailureResult(
+ duplicateMappingException(msg.mappingItem(), msg.conflictingClassName())));
+ }
+ }
+ }
+ }
+
+ /**
+ * @param mappingItem Mapping item.
+ * @param conflictingClsName Conflicting class name.
+ */
+ private IgniteCheckedException duplicateMappingException(
+ MarshallerMappingItem mappingItem,
+ String conflictingClsName
+ ) {
+ return new IgniteCheckedException("Duplicate ID [platformId="
+ + mappingItem.platformId()
+ + ", typeId="
+ + mappingItem.typeId()
+ + ", oldCls="
+ + conflictingClsName
+ + ", newCls="
+ + mappingItem.className() + "]");
+ }
+ }
+
+ /**
+ *
+ */
+ private final class MappingAcceptedListener implements CustomEventListener<MappingAcceptedMessage> {
+ /** {@inheritDoc} */
+ @Override public void onCustomEvent(
+ AffinityTopologyVersion topVer,
+ ClusterNode snd,
+ MappingAcceptedMessage msg
+ ) {
+ MarshallerMappingItem item = msg.getMappingItem();
+ marshallerCtx.onMappingAccepted(item);
+
+ GridFutureAdapter<MappingExchangeResult> fut = mappingExchangeSyncMap.get(item);
+
+ if (fut != null)
+ fut.onDone(MappingExchangeResult.createSuccessfulResult(item.className()));
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void collectGridNodeData(DiscoveryDataBag dataBag) {
+ if (!dataBag.commonDataCollectedFor(MARSHALLER_PROC.ordinal()))
+ dataBag.addGridCommonData(MARSHALLER_PROC.ordinal(), marshallerCtx.getCachedMappings());
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onGridDataReceived(GridDiscoveryData data) {
+ List<Map<Integer, MappedName>> mappings = (List<Map<Integer, MappedName>>) data.commonData();
+
+ if (mappings != null) {
+ for (int i = 0; i < mappings.size(); i++) {
+ Map<Integer, MappedName> map;
+
+ if ((map = mappings.get(i)) != null)
+ marshallerCtx.onMappingDataReceived((byte) i, map);
+ }
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onDisconnected(IgniteFuture<?> reconnectFut) throws IgniteCheckedException {
+ cancelFutures(MappingExchangeResult.createFailureResult(new IgniteClientDisconnectedCheckedException(
+ ctx.cluster().clientReconnectFuture(),
+ "Failed to propose or request mapping, client node disconnected.")));
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onKernalStop(boolean cancel) {
+ marshallerCtx.onMarshallerProcessorStop();
+
+ cancelFutures(MappingExchangeResult.createExchangeDisabledResult());
+ }
+
+ /** {@inheritDoc} */
+ @Nullable @Override public DiscoveryDataExchangeType discoveryDataType() {
+ return MARSHALLER_PROC;
+ }
+
+ /**
+ * @param res Response.
+ */
+ private void cancelFutures(MappingExchangeResult res) {
+ for (GridFutureAdapter<MappingExchangeResult> fut : mappingExchangeSyncMap.values())
+ fut.onDone(res);
+
+ for (GridFutureAdapter<MappingExchangeResult> fut : clientReqSyncMap.values())
+ fut.onDone(res);
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/4cd332b7/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/MappedName.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/MappedName.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/MappedName.java
new file mode 100644
index 0000000..c13c48e
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/MappedName.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.marshaller;
+
+import java.io.Serializable;
+import org.apache.ignite.internal.util.typedef.internal.S;
+
+/**
+ * Contains mapped class name and boolean flag showing whether this mapping was accepted by other nodes or not.
+ */
+public final class MappedName implements Serializable {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** */
+ private final String clsName;
+
+ /** */
+ private final boolean accepted;
+
+ /**
+ * @param clsName Class name.
+ * @param accepted Accepted.
+ */
+ public MappedName(String clsName, boolean accepted) {
+ this.clsName = clsName;
+ this.accepted = accepted;
+ }
+
+ /**
+ *
+ */
+ public String className() {
+ return clsName;
+ }
+
+ /**
+ *
+ */
+ public boolean accepted() {
+ return accepted;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(MappedName.class, this);
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/4cd332b7/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/MappingAcceptedMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/MappingAcceptedMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/MappingAcceptedMessage.java
new file mode 100644
index 0000000..23c2858
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/MappingAcceptedMessage.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.marshaller;
+
+import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.lang.IgniteUuid;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Is sent as an acknowledgement for successfully proposed new mapping (see {@link MappingProposedMessage}).
+ *
+ * If any nodes were waiting for this mapping to be accepted they will be unblocked on receiving this message.
+ */
+public class MappingAcceptedMessage implements DiscoveryCustomMessage {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** */
+ private final IgniteUuid id = IgniteUuid.randomUuid();
+
+ /** */
+ private final MarshallerMappingItem item;
+
+ /**
+ * @param item Item.
+ */
+ MappingAcceptedMessage(MarshallerMappingItem item) {
+ this.item = item;
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteUuid id() {
+ return id;
+ }
+
+ /** {@inheritDoc} */
+ @Nullable @Override public DiscoveryCustomMessage ackMessage() {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean isMutable() {
+ return false;
+ }
+
+ /** */
+ MarshallerMappingItem getMappingItem() {
+ return item;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(MappingAcceptedMessage.class, this);
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/4cd332b7/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/MappingExchangeResult.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/MappingExchangeResult.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/MappingExchangeResult.java
new file mode 100644
index 0000000..4bc1442
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/MappingExchangeResult.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.internal.processors.marshaller;
+
+import org.apache.ignite.IgniteCheckedException;
+
+/**
+ *
+ */
+public class MappingExchangeResult {
+ /** */
+ private final String acceptedClsName;
+
+ /** */
+ private final IgniteCheckedException error;
+
+ /** */
+ private final ResultType resType;
+
+ /** */
+ private enum ResultType {
+ /** */
+ SUCCESS,
+
+ /** */
+ FAILURE,
+
+ /** */
+ EXCHANGE_DISABLED
+ }
+
+ /**
+ */
+ private MappingExchangeResult(ResultType resType, String acceptedClsName, IgniteCheckedException error) {
+ this.resType = resType;
+ this.acceptedClsName = acceptedClsName;
+ this.error = error;
+ }
+
+ /** */
+ public String className() {
+ return acceptedClsName;
+ }
+
+ /** */
+ public IgniteCheckedException error() {
+ return error;
+ }
+
+ /** */
+ public boolean successful() {
+ return resType == ResultType.SUCCESS;
+ }
+
+ /** */
+ public boolean exchangeDisabled() {
+ return resType == ResultType.EXCHANGE_DISABLED;
+ }
+
+ /**
+ * @param acceptedClsName Accepted class name.
+ */
+ static MappingExchangeResult createSuccessfulResult(String acceptedClsName) {
+ assert acceptedClsName != null;
+
+ return new MappingExchangeResult(ResultType.SUCCESS, acceptedClsName, null);
+ }
+
+ /**
+ * @param error Error.
+ */
+ static MappingExchangeResult createFailureResult(IgniteCheckedException error) {
+ assert error != null;
+
+ return new MappingExchangeResult(ResultType.FAILURE, null, error);
+ }
+
+ /** */
+ static MappingExchangeResult createExchangeDisabledResult() {
+ return new MappingExchangeResult(ResultType.EXCHANGE_DISABLED, null, null);
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/4cd332b7/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/MappingProposedMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/MappingProposedMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/MappingProposedMessage.java
new file mode 100644
index 0000000..33a2168
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/MappingProposedMessage.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.marshaller;
+
+import java.util.UUID;
+import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
+import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.lang.IgniteUuid;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Node sends this message when it wants to propose new marshaller mapping and to ensure that there are no conflicts
+ * with this mapping on other nodes in cluster.
+ *
+ * After sending this message to the cluster sending node gets blocked until mapping is either accepted or rejected.
+ *
+ * When it completes a pass around the cluster ring with no conflicts observed,
+ * {@link MappingAcceptedMessage} is sent as an acknowledgement that everything is fine.
+ */
+public class MappingProposedMessage implements DiscoveryCustomMessage {
+ /** */
+ private enum ProposalStatus {
+ /** */
+ SUCCESSFUL,
+ /** */
+ IN_CONFLICT,
+ /** */
+ DUPLICATED
+ }
+
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** */
+ private final IgniteUuid id = IgniteUuid.randomUuid();
+
+ /** */
+ private final UUID origNodeId;
+
+ /** */
+ @GridToStringInclude
+ private final MarshallerMappingItem mappingItem;
+
+ /** */
+ private ProposalStatus status = ProposalStatus.SUCCESSFUL;
+
+ /** */
+ private String conflictingClsName;
+
+ /**
+ * @param mappingItem Mapping item.
+ * @param origNodeId Orig node id.
+ */
+ MappingProposedMessage(MarshallerMappingItem mappingItem, UUID origNodeId) {
+ assert origNodeId != null;
+
+ this.mappingItem = mappingItem;
+ this.origNodeId = origNodeId;
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteUuid id() {
+ return id;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Nullable @Override public DiscoveryCustomMessage ackMessage() {
+ if (status == ProposalStatus.SUCCESSFUL)
+ return new MappingAcceptedMessage(mappingItem);
+ else
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean isMutable() {
+ return true;
+ }
+
+ /** */
+ MarshallerMappingItem mappingItem() {
+ return mappingItem;
+ }
+
+ /** */
+ UUID origNodeId() {
+ return origNodeId;
+ }
+
+ /** */
+ boolean inConflict() {
+ return status == ProposalStatus.IN_CONFLICT;
+ }
+
+ /** */
+ public boolean duplicated() {
+ return status == ProposalStatus.DUPLICATED;
+ }
+
+ /** */
+ void conflictingWithClass(String conflClsName) {
+ status = ProposalStatus.IN_CONFLICT;
+ conflictingClsName = conflClsName;
+ }
+
+ /** */
+ void markDuplicated() {
+ status = ProposalStatus.DUPLICATED;
+ }
+
+ /** */
+ String conflictingClassName() {
+ return conflictingClsName;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(MappingProposedMessage.class, this);
+ }
+}