You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ag...@apache.org on 2017/01/19 12:05:45 UTC
[5/5] ignite git commit: IGNITE-4157 Use discovery custom messages
instead of marshaller cache - Fixes #1271.
IGNITE-4157 Use discovery custom messages instead of marshaller cache - Fixes #1271.
Signed-off-by: Alexey Goncharuk <al...@gmail.com>
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/4cd332b7
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/4cd332b7
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/4cd332b7
Branch: refs/heads/ignite-2.0
Commit: 4cd332b781cf700b99402eed2363f988f6403602
Parents: 454b976
Author: Sergey Chugunov <se...@gmail.com>
Authored: Thu Jan 19 15:05:09 2017 +0300
Committer: Alexey Goncharuk <al...@gmail.com>
Committed: Thu Jan 19 15:05:09 2017 +0300
----------------------------------------------------------------------
.../configuration/IgniteConfiguration.java | 72 --
.../apache/ignite/internal/GridComponent.java | 48 +-
.../ignite/internal/GridKernalContext.java | 7 -
.../ignite/internal/GridKernalContextImpl.java | 22 +-
.../ignite/internal/GridPluginComponent.java | 21 +-
.../org/apache/ignite/internal/GridTopic.java | 3 +
.../apache/ignite/internal/IgniteKernal.java | 6 +-
.../org/apache/ignite/internal/IgnitionEx.java | 50 --
.../ignite/internal/MappingStoreTask.java | 58 ++
.../internal/MarshallerContextAdapter.java | 211 ------
.../ignite/internal/MarshallerContextImpl.java | 664 +++++++++++++------
.../internal/MarshallerMappingFileStore.java | 174 +++++
.../ignite/internal/MarshallerPlatformIds.java | 30 +
.../ignite/internal/binary/BinaryContext.java | 8 +-
.../GridClientOptimizedMarshaller.java | 16 +-
.../internal/managers/GridManagerAdapter.java | 19 +-
.../managers/communication/GridIoManager.java | 2 -
.../communication/GridIoMessageFactory.java | 12 +
.../managers/communication/GridIoPolicy.java | 7 +-
.../discovery/GridDiscoveryManager.java | 83 ++-
.../processors/GridProcessorAdapter.java | 21 +-
.../internal/processors/cache/CacheType.java | 8 +-
.../processors/cache/GridCacheAdapter.java | 1 -
.../processors/cache/GridCacheProcessor.java | 275 ++++----
.../processors/cache/GridCacheTtlManager.java | 1 -
.../processors/cache/GridCacheUtils.java | 13 +-
...ridNearOptimisticTxPrepareFutureAdapter.java | 13 +
.../cache/query/GridCacheQueryManager.java | 2 +-
.../processors/cluster/ClusterProcessor.java | 53 +-
.../continuous/GridContinuousProcessor.java | 144 ++--
.../marshaller/ClientRequestFuture.java | 183 +++++
.../GridMarshallerMappingProcessor.java | 326 +++++++++
.../processors/marshaller/MappedName.java | 63 ++
.../marshaller/MappingAcceptedMessage.java | 71 ++
.../marshaller/MappingExchangeResult.java | 96 +++
.../marshaller/MappingProposedMessage.java | 137 ++++
.../marshaller/MarshallerMappingItem.java | 99 +++
.../marshaller/MarshallerMappingTransport.java | 212 ++++++
.../MissingMappingRequestMessage.java | 146 ++++
.../MissingMappingResponseMessage.java | 169 +++++
.../platform/utils/PlatformUtils.java | 56 +-
.../plugin/IgnitePluginProcessor.java | 84 ++-
.../internal/processors/pool/PoolProcessor.java | 5 -
.../service/GridServiceProcessor.java | 9 +-
.../ignite/marshaller/MarshallerContext.java | 23 +-
.../optimized/OptimizedMarshallerUtils.java | 16 +-
.../communication/tcp/TcpCommunicationSpi.java | 0
.../ignite/spi/discovery/DiscoveryDataBag.java | 299 +++++++++
.../spi/discovery/DiscoverySpiDataExchange.java | 15 +-
.../ignite/spi/discovery/tcp/ClientImpl.java | 26 +-
.../ignite/spi/discovery/tcp/ServerImpl.java | 98 +--
.../spi/discovery/tcp/TcpDiscoverySpi.java | 71 +-
.../tcp/internal/DiscoveryDataPacket.java | 345 ++++++++++
.../TcpDiscoveryJoinRequestMessage.java | 18 +-
.../TcpDiscoveryNodeAddFinishedMessage.java | 10 +-
.../messages/TcpDiscoveryNodeAddedMessage.java | 78 +--
.../ignite/internal/GridAffinitySelfTest.java | 2 +-
.../MarshallerContextLockingSelfTest.java | 61 +-
.../binary/BinaryMarshallerSelfTest.java | 20 +-
...GridBinaryMarshallerCtxDisabledSelfTest.java | 42 +-
.../cache/GridCacheEntryMemorySizeSelfTest.java | 8 +-
.../cache/IgniteInternalCacheTypesTest.java | 11 -
...iteMarshallerCacheClassNameConflictTest.java | 273 ++++++++
...lerCacheClientRequestsMappingOnMissTest.java | 345 ++++++++++
.../cache/IgniteSystemCacheOnClientTest.java | 23 +-
.../GridBinaryCacheEntryMemorySizeSelfTest.java | 21 +-
.../IgniteCacheSystemTransactionsSelfTest.java | 18 +-
...achePartitionedMultiNodeFullApiSelfTest.java | 8 +-
...ridCacheContinuousQueryAbstractSelfTest.java | 2 +-
.../marshaller/MarshallerContextSelfTest.java | 144 +++-
.../marshaller/MarshallerContextTestImpl.java | 34 +-
.../OptimizedMarshallerEnumSelfTest.java | 25 +-
.../discovery/AbstractDiscoverySelfTest.java | 7 +-
.../spi/discovery/tcp/TcpDiscoverySelfTest.java | 103 ++-
.../tcp/TcpDiscoverySpiStartStopSelfTest.java | 11 +-
.../testframework/junits/GridAbstractTest.java | 13 +-
.../junits/GridTestKernalContext.java | 1 -
.../junits/spi/GridSpiAbstractTest.java | 9 +-
.../ignite/testsuites/IgniteBasicTestSuite.java | 2 +-
.../testsuites/IgniteBinaryBasicTestSuite.java | 5 +
.../ignite/thread/IgniteThreadPoolSizeTest.java | 8 -
81 files changed, 4597 insertions(+), 1298 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/4cd332b7/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java b/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java
index e0ff9b9..512ceee 100644
--- a/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java
+++ b/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java
@@ -263,12 +263,6 @@ public class IgniteConfiguration {
/** Utility cache pool keep alive time. */
private long utilityCacheKeepAliveTime = DFLT_THREAD_KEEP_ALIVE_TIME;
- /** Marshaller pool size. */
- private int marshCachePoolSize = DFLT_SYSTEM_CORE_THREAD_CNT;
-
- /** Marshaller pool keep alive time. */
- private long marshCacheKeepAliveTime = DFLT_THREAD_KEEP_ALIVE_TIME;
-
/** P2P pool size. */
private int p2pPoolSize = DFLT_P2P_THREAD_CNT;
@@ -539,8 +533,6 @@ public class IgniteConfiguration {
lsnrs = cfg.getLocalEventListeners();
marsh = cfg.getMarshaller();
marshLocJobs = cfg.isMarshalLocalJobs();
- marshCacheKeepAliveTime = cfg.getMarshallerCacheKeepAliveTime();
- marshCachePoolSize = cfg.getMarshallerCacheThreadPoolSize();
mbeanSrv = cfg.getMBeanServer();
metricsHistSize = cfg.getMetricsHistorySize();
metricsExpTime = cfg.getMetricsExpireTime();
@@ -877,28 +869,6 @@ public class IgniteConfiguration {
}
/**
- * Default size of thread pool that is in charge of processing marshaller messages.
- * <p>
- * If not provided, executor service will have size {@link #DFLT_SYSTEM_CORE_THREAD_CNT}.
- *
- * @return Default thread pool size to be used in grid for marshaller messages.
- */
- public int getMarshallerCacheThreadPoolSize() {
- return marshCachePoolSize;
- }
-
- /**
- * Keep alive time of thread pool that is in charge of processing marshaller messages.
- * <p>
- * If not provided, executor service will have keep alive time {@link #DFLT_THREAD_KEEP_ALIVE_TIME}.
- *
- * @return Thread pool keep alive time (in milliseconds) to be used in grid for marshaller messages.
- */
- public long getMarshallerCacheKeepAliveTime() {
- return marshCacheKeepAliveTime;
- }
-
- /**
* Sets thread pool size to use within grid.
*
* @param poolSize Thread pool size to use within grid.
@@ -1019,48 +989,6 @@ public class IgniteConfiguration {
}
/**
- * Sets default thread pool size that will be used to process marshaller messages.
- *
- * @param poolSize Default executor service size to use for marshaller messages.
- * @see IgniteConfiguration#getMarshallerCacheThreadPoolSize()
- * @see IgniteConfiguration#getMarshallerCacheKeepAliveTime()
- * @return {@code this} for chaining.
- * @deprecated Use {@link #setMarshallerCacheThreadPoolSize(int)} instead.
- */
- @Deprecated
- public IgniteConfiguration setMarshallerCachePoolSize(int poolSize) {
- return setMarshallerCacheThreadPoolSize(poolSize);
- }
-
- /**
- * Sets default thread pool size that will be used to process marshaller messages.
- *
- * @param poolSize Default executor service size to use for marshaller messages.
- * @see IgniteConfiguration#getMarshallerCacheThreadPoolSize()
- * @see IgniteConfiguration#getMarshallerCacheKeepAliveTime()
- * @return {@code this} for chaining.
- */
- public IgniteConfiguration setMarshallerCacheThreadPoolSize(int poolSize) {
- marshCachePoolSize = poolSize;
-
- return this;
- }
-
- /**
- * Sets maximum thread pool size that will be used to process marshaller messages.
- *
- * @param keepAliveTime Keep alive time of executor service to use for marshaller messages.
- * @see IgniteConfiguration#getMarshallerCacheThreadPoolSize()
- * @see IgniteConfiguration#getMarshallerCacheKeepAliveTime()
- * @return {@code this} for chaining.
- */
- public IgniteConfiguration setMarshallerCacheKeepAliveTime(long keepAliveTime) {
- marshCacheKeepAliveTime = keepAliveTime;
-
- return this;
- }
-
- /**
* Should return Ignite installation home folder. If not provided, the system will check
* {@code IGNITE_HOME} system property and environment variable in that order. If
* {@code IGNITE_HOME} still could not be obtained, then grid will not start and exception
http://git-wip-us.apache.org/repos/asf/ignite/blob/4cd332b7/modules/core/src/main/java/org/apache/ignite/internal/GridComponent.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridComponent.java b/modules/core/src/main/java/org/apache/ignite/internal/GridComponent.java
index 5c77aee..560d7f6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridComponent.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridComponent.java
@@ -17,12 +17,15 @@
package org.apache.ignite.internal;
-import java.io.Serializable;
-import java.util.UUID;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.lang.IgniteFuture;
import org.apache.ignite.spi.IgniteNodeValidationResult;
+import org.apache.ignite.spi.discovery.DiscoveryDataBag;
+import org.apache.ignite.spi.discovery.DiscoveryDataBag.GridDiscoveryData;
+import org.apache.ignite.spi.discovery.DiscoveryDataBag.JoiningNodeDiscoveryData;
+import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryJoinRequestMessage;
+import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeAddedMessage;
import org.jetbrains.annotations.Nullable;
/**
@@ -43,7 +46,10 @@ public interface GridComponent {
PLUGIN,
/** */
- CLUSTER_PROC
+ CLUSTER_PROC,
+
+ /** */
+ MARSHALLER_PROC
}
/**
@@ -78,24 +84,37 @@ public interface GridComponent {
public void onKernalStop(boolean cancel);
/**
- * Gets discovery data object that will be sent to new node
- * during discovery process.
+ * Collects discovery data on joining node before sending
+ * {@link TcpDiscoveryJoinRequestMessage} request.
+ *
+ * @param dataBag container object to store discovery data in.
+ */
+ public void collectJoiningNodeData(DiscoveryDataBag dataBag);
+
+ /**
+ * Collects discovery data on nodes already in grid on receiving
+ * {@link TcpDiscoveryNodeAddedMessage}.
*
- * @param nodeId ID of new node that joins topology.
- * @return Discovery data object or {@code null} if there is nothing
- * to send for this component.
+ * @param dataBag container object to store discovery data in.
*/
- @Nullable public Serializable collectDiscoveryData(UUID nodeId);
+ public void collectGridNodeData(DiscoveryDataBag dataBag);
/**
* Receives discovery data object from remote nodes (called
* on new node during discovery process).
*
- * @param joiningNodeId Joining node ID.
- * @param rmtNodeId Remote node ID for which data is provided.
- * @param data Discovery data object or {@code null} if nothing was
+ * @param data {@link GridDiscoveryData} interface to retrieve discovery data collected on remote nodes
+ * (data common for all nodes in grid and specific for each node).
+ */
+ public void onGridDataReceived(GridDiscoveryData data);
+
+ /**
+ * Method is called on nodes that are already in grid (not on joining node).
+ * It receives discovery data from joining node.
+ *
+ * @param data {@link JoiningNodeDiscoveryData} interface to retrieve discovery data of joining node.
*/
- public void onDiscoveryDataReceived(UUID joiningNodeId, UUID rmtNodeId, Serializable data);
+ public void onJoiningNodeDataReceived(JoiningNodeDiscoveryData data);
/**
* Prints memory statistics (sizes of internal structures, etc.).
@@ -115,7 +134,8 @@ public interface GridComponent {
/**
* Gets unique component type to distinguish components providing discovery data. Must return non-null value
- * if component implements method {@link #collectDiscoveryData(UUID)}.
+ * if component implements any of methods {@link #collectJoiningNodeData(DiscoveryDataBag)}
+ * or {@link #collectGridNodeData(DiscoveryDataBag)}.
*
* @return Unique component type for discovery data exchange.
*/
http://git-wip-us.apache.org/repos/asf/ignite/blob/4cd332b7/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java
index 9157fed..b083c84 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java
@@ -310,13 +310,6 @@ public interface GridKernalContext extends Iterable<GridComponent> {
public ExecutorService utilityCachePool();
/**
- * Gets marshaller cache pool.
- *
- * @return Marshaller cache pool.
- */
- public ExecutorService marshallerCachePool();
-
- /**
* Gets async callback pool.
*
* @return Async callback pool.
http://git-wip-us.apache.org/repos/asf/ignite/blob/4cd332b7/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java
index 8fc5b36..2f681ac 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java
@@ -59,12 +59,13 @@ import org.apache.ignite.internal.processors.cluster.ClusterProcessor;
import org.apache.ignite.internal.processors.continuous.GridContinuousProcessor;
import org.apache.ignite.internal.processors.datastreamer.DataStreamProcessor;
import org.apache.ignite.internal.processors.datastructures.DataStructuresProcessor;
-import org.apache.ignite.internal.processors.hadoop.HadoopProcessorAdapter;
import org.apache.ignite.internal.processors.hadoop.HadoopHelper;
+import org.apache.ignite.internal.processors.hadoop.HadoopProcessorAdapter;
import org.apache.ignite.internal.processors.igfs.IgfsHelper;
import org.apache.ignite.internal.processors.igfs.IgfsProcessorAdapter;
import org.apache.ignite.internal.processors.job.GridJobProcessor;
import org.apache.ignite.internal.processors.jobmetrics.GridJobMetricsProcessor;
+import org.apache.ignite.internal.processors.marshaller.GridMarshallerMappingProcessor;
import org.apache.ignite.internal.processors.nodevalidation.DiscoveryNodeValidationProcessor;
import org.apache.ignite.internal.processors.odbc.OdbcProcessor;
import org.apache.ignite.internal.processors.offheap.GridOffHeapProcessor;
@@ -346,9 +347,6 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable
private ExecutorService utilityCachePool;
/** */
- private ExecutorService marshCachePool;
-
- /** */
private IgniteConfiguration cfg;
/** */
@@ -387,7 +385,6 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable
* @param cfg Grid configuration.
* @param gw Kernal gateway.
* @param utilityCachePool Utility cache pool.
- * @param marshCachePool Marshaller cache pool.
* @param execSvc Public executor service.
* @param sysExecSvc System executor service.
* @param stripedExecSvc Striped executor.
@@ -408,7 +405,6 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable
IgniteConfiguration cfg,
GridKernalGateway gw,
ExecutorService utilityCachePool,
- ExecutorService marshCachePool,
ExecutorService execSvc,
ExecutorService sysExecSvc,
StripedExecutor stripedExecSvc,
@@ -421,7 +417,7 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable
@Nullable ExecutorService idxExecSvc,
IgniteStripedThreadPoolExecutor callbackExecSvc,
List<PluginProvider> plugins
- ) throws IgniteCheckedException {
+ ) {
assert grid != null;
assert cfg != null;
assert gw != null;
@@ -430,7 +426,6 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable
this.cfg = cfg;
this.gw = gw;
this.utilityCachePool = utilityCachePool;
- this.marshCachePool = marshCachePool;
this.execSvc = execSvc;
this.sysExecSvc = sysExecSvc;
this.stripedExecSvc = stripedExecSvc;
@@ -443,9 +438,7 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable
this.idxExecSvc = idxExecSvc;
this.callbackExecSvc = callbackExecSvc;
- String workDir = U.workDirectory(cfg.getWorkDirectory(), cfg.getIgniteHome());
-
- marshCtx = new MarshallerContextImpl(workDir, plugins);
+ marshCtx = new MarshallerContextImpl(plugins);
try {
spring = SPRING.create(false);
@@ -570,7 +563,7 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable
platformProc = (PlatformProcessor)comp;
else if (comp instanceof PoolProcessor)
poolProc = (PoolProcessor) comp;
- else if (!(comp instanceof DiscoveryNodeValidationProcessor))
+ else if (!(comp instanceof DiscoveryNodeValidationProcessor || comp instanceof GridMarshallerMappingProcessor))
assert (comp instanceof GridPluginComponent) : "Unknown manager class: " + comp.getClass();
if (addToList)
@@ -804,11 +797,6 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable
}
/** {@inheritDoc} */
- @Override public ExecutorService marshallerCachePool() {
- return marshCachePool;
- }
-
- /** {@inheritDoc} */
@Override public IgniteStripedThreadPoolExecutor asyncCallbackPool() {
return callbackExecSvc;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/4cd332b7/modules/core/src/main/java/org/apache/ignite/internal/GridPluginComponent.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridPluginComponent.java b/modules/core/src/main/java/org/apache/ignite/internal/GridPluginComponent.java
index 89dc243..cc1ae71 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridPluginComponent.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridPluginComponent.java
@@ -17,14 +17,15 @@
package org.apache.ignite.internal;
-import java.io.Serializable;
-import java.util.UUID;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.lang.IgniteFuture;
import org.apache.ignite.plugin.PluginProvider;
import org.apache.ignite.plugin.PluginValidationException;
import org.apache.ignite.spi.IgniteNodeValidationResult;
+import org.apache.ignite.spi.discovery.DiscoveryDataBag;
+import org.apache.ignite.spi.discovery.DiscoveryDataBag.GridDiscoveryData;
+import org.apache.ignite.spi.discovery.DiscoveryDataBag.JoiningNodeDiscoveryData;
import org.jetbrains.annotations.Nullable;
/**
@@ -85,12 +86,22 @@ public class GridPluginComponent implements GridComponent {
}
/** {@inheritDoc} */
- @Nullable @Override public Serializable collectDiscoveryData(UUID nodeId) {
- return null;
+ @Override public void collectJoiningNodeData(DiscoveryDataBag dataBag) {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public void collectGridNodeData(DiscoveryDataBag dataBag) {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onGridDataReceived(GridDiscoveryData data) {
+ // No-op.
}
/** {@inheritDoc} */
- @Override public void onDiscoveryDataReceived(UUID joiningNodeId, UUID rmtNodeId, Serializable data) {
+ @Override public void onJoiningNodeDataReceived(JoiningNodeDiscoveryData data) {
// No-op.
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/4cd332b7/modules/core/src/main/java/org/apache/ignite/internal/GridTopic.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridTopic.java b/modules/core/src/main/java/org/apache/ignite/internal/GridTopic.java
index 2962540..ce528cb 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridTopic.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridTopic.java
@@ -100,6 +100,9 @@ public enum GridTopic {
TOPIC_IO_TEST,
/** */
+ TOPIC_MAPPING_MARSH,
+
+ /** */
TOPIC_HADOOP_MSG;
/** Enum values. */
http://git-wip-us.apache.org/repos/asf/ignite/blob/4cd332b7/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
index 99c3dab..20926f9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
@@ -118,6 +118,7 @@ import org.apache.ignite.internal.processors.hadoop.Hadoop;
import org.apache.ignite.internal.processors.hadoop.HadoopProcessorAdapter;
import org.apache.ignite.internal.processors.job.GridJobProcessor;
import org.apache.ignite.internal.processors.jobmetrics.GridJobMetricsProcessor;
+import org.apache.ignite.internal.processors.marshaller.GridMarshallerMappingProcessor;
import org.apache.ignite.internal.processors.nodevalidation.DiscoveryNodeValidationProcessor;
import org.apache.ignite.internal.processors.nodevalidation.OsDiscoveryNodeValidationProcessor;
import org.apache.ignite.internal.processors.odbc.OdbcProcessor;
@@ -679,7 +680,6 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
public void start(
final IgniteConfiguration cfg,
ExecutorService utilityCachePool,
- ExecutorService marshCachePool,
final ExecutorService execSvc,
final ExecutorService sysExecSvc,
final StripedExecutor stripedExecSvc,
@@ -789,7 +789,6 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
cfg,
gw,
utilityCachePool,
- marshCachePool,
execSvc,
sysExecSvc,
stripedExecSvc,
@@ -905,6 +904,7 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
startProcessor(createHadoopComponent());
startProcessor(new DataStructuresProcessor(ctx));
startProcessor(createComponent(PlatformProcessor.class, ctx));
+ startProcessor(new GridMarshallerMappingProcessor(ctx));
// Start plugins.
for (PluginProvider provider : ctx.plugins().allProviders()) {
@@ -1967,8 +1967,6 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
List<GridComponent> comps = ctx.components();
- ctx.marshallerContext().onKernalStop();
-
// Callback component in reverse order while kernal is still functional
// if called in the same thread, at least.
for (ListIterator<GridComponent> it = comps.listIterator(comps.size()); it.hasPrevious();) {
http://git-wip-us.apache.org/repos/asf/ignite/blob/4cd332b7/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java b/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
index 9fe6fd0..c55f954 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
@@ -114,7 +114,6 @@ import static org.apache.ignite.IgniteSystemProperties.IGNITE_LOCAL_HOST;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_NO_SHUTDOWN_HOOK;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_RESTART_CODE;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_SUCCESS_FILE;
-import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
import static org.apache.ignite.cache.CacheMode.PARTITIONED;
import static org.apache.ignite.cache.CacheMode.REPLICATED;
@@ -1481,9 +1480,6 @@ public class IgnitionEx {
/** Utility cache executor service. */
private ThreadPoolExecutor utilityCacheExecSvc;
- /** Marshaller cache executor service. */
- private ThreadPoolExecutor marshCacheExecSvc;
-
/** Affinity executor service. */
private ThreadPoolExecutor affExecSvc;
@@ -1764,18 +1760,6 @@ public class IgnitionEx {
utilityCacheExecSvc.allowCoreThreadTimeOut(true);
- validateThreadPoolSize(myCfg.getMarshallerCacheThreadPoolSize(), "marshaller cache");
-
- marshCacheExecSvc = new IgniteThreadPoolExecutor(
- "marshaller-cache",
- cfg.getGridName(),
- myCfg.getMarshallerCacheThreadPoolSize(),
- myCfg.getMarshallerCacheThreadPoolSize(),
- myCfg.getMarshallerCacheKeepAliveTime(),
- new LinkedBlockingQueue<Runnable>());
-
- marshCacheExecSvc.allowCoreThreadTimeOut(true);
-
affExecSvc = new IgniteThreadPoolExecutor(
"aff",
cfg.getGridName(),
@@ -1813,7 +1797,6 @@ public class IgnitionEx {
grid0.start(
myCfg,
utilityCacheExecSvc,
- marshCacheExecSvc,
execSvc,
sysExecSvc,
stripedExecSvc,
@@ -2068,8 +2051,6 @@ public class IgnitionEx {
public void initializeDefaultCacheConfiguration(IgniteConfiguration cfg) throws IgniteCheckedException {
List<CacheConfiguration> cacheCfgs = new ArrayList<>();
- cacheCfgs.add(marshallerSystemCache());
-
cacheCfgs.add(utilitySystemCache());
if (IgniteComponentType.HADOOP.inClassPath())
@@ -2098,10 +2079,6 @@ public class IgnitionEx {
throw new IgniteCheckedException("Cache name cannot be \"" + CU.UTILITY_CACHE_NAME +
"\" because it is reserved for internal purposes.");
- if (CU.isMarshallerCache(ccfg.getName()))
- throw new IgniteCheckedException("Cache name cannot be \"" + CU.MARSH_CACHE_NAME +
- "\" because it is reserved for internal purposes.");
-
cacheCfgs.add(ccfg);
}
}
@@ -2277,29 +2254,6 @@ public class IgnitionEx {
}
/**
- * Creates marshaller system cache configuration.
- *
- * @return Marshaller system cache configuration.
- */
- private static CacheConfiguration marshallerSystemCache() {
- CacheConfiguration cache = new CacheConfiguration();
-
- cache.setName(CU.MARSH_CACHE_NAME);
- cache.setCacheMode(REPLICATED);
- cache.setAtomicityMode(ATOMIC);
- cache.setSwapEnabled(false);
- cache.setRebalanceMode(SYNC);
- cache.setWriteSynchronizationMode(FULL_SYNC);
- cache.setAffinity(new RendezvousAffinityFunction(false, 20));
- cache.setNodeFilter(CacheConfiguration.ALL_NODES);
- cache.setStartSize(300);
- cache.setRebalanceOrder(-2);//Prior to other system caches.
- cache.setCopyOnRead(false);
-
- return cache;
- }
-
- /**
* Creates utility system cache configuration.
*
* @return Utility system cache configuration.
@@ -2477,10 +2431,6 @@ public class IgnitionEx {
utilityCacheExecSvc = null;
- U.shutdownNow(getClass(), marshCacheExecSvc, log);
-
- marshCacheExecSvc = null;
-
U.shutdownNow(getClass(), affExecSvc, log);
affExecSvc = null;
http://git-wip-us.apache.org/repos/asf/ignite/blob/4cd332b7/modules/core/src/main/java/org/apache/ignite/internal/MappingStoreTask.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/MappingStoreTask.java b/modules/core/src/main/java/org/apache/ignite/internal/MappingStoreTask.java
new file mode 100644
index 0000000..102347a
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/MappingStoreTask.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal;
+
+import org.apache.ignite.internal.processors.marshaller.MarshallerMappingItem;
+
+/**
+ * Task is used in {@link MarshallerContextImpl#onMappingAccepted(MarshallerMappingItem)}
+ * to offload storing mapping data into file system from discovery thread.
+ */
+class MappingStoreTask implements Runnable {
+ /** Store to put item to. */
+ private final MarshallerMappingFileStore fileStore;
+
+ /** */
+ private final byte platformId;
+
+ /** */
+ private final int typeId;
+
+ /** */
+ private final String clsName;
+
+ /**
+ * @param fileStore File store.
+ * @param platformId Platform id.
+ * @param typeId Type id.
+ * @param clsName Class name.
+ */
+ MappingStoreTask(MarshallerMappingFileStore fileStore, byte platformId, int typeId, String clsName) {
+ assert clsName != null;
+
+ this.fileStore = fileStore;
+ this.platformId = platformId;
+ this.typeId = typeId;
+ this.clsName = clsName;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void run() {
+ fileStore.writeMapping(platformId, typeId, clsName);
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/4cd332b7/modules/core/src/main/java/org/apache/ignite/internal/MarshallerContextAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/MarshallerContextAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/MarshallerContextAdapter.java
deleted file mode 100644
index ad34393..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/MarshallerContextAdapter.java
+++ /dev/null
@@ -1,211 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal;
-
-import java.io.BufferedReader;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.InputStreamReader;
-import java.net.URL;
-import java.util.Enumeration;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.ConcurrentMap;
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.IgniteException;
-import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionFullMap;
-import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionMap2;
-import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.marshaller.MarshallerContext;
-import org.apache.ignite.plugin.PluginProvider;
-import org.jetbrains.annotations.Nullable;
-import org.jsr166.ConcurrentHashMap8;
-
-/**
- * Marshaller context adapter.
- */
-public abstract class MarshallerContextAdapter implements MarshallerContext {
- /** */
- private static final String CLS_NAMES_FILE = "META-INF/classnames.properties";
-
- /** */
- private static final String JDK_CLS_NAMES_FILE = "META-INF/classnames-jdk.properties";
-
- /** */
- private final ConcurrentMap<Integer, String> map = new ConcurrentHashMap8<>();
-
- /** */
- private final Set<String> registeredSystemTypes = new HashSet<>();
-
- /**
- * Initializes context.
- *
- * @param plugins Plugins.
- */
- public MarshallerContextAdapter(@Nullable List<PluginProvider> plugins) {
- try {
- ClassLoader ldr = U.gridClassLoader();
-
- Enumeration<URL> urls = ldr.getResources(CLS_NAMES_FILE);
-
- boolean foundClsNames = false;
-
- while (urls.hasMoreElements()) {
- processResource(urls.nextElement());
-
- foundClsNames = true;
- }
-
- if (!foundClsNames)
- throw new IgniteException("Failed to load class names properties file packaged with ignite binaries " +
- "[file=" + CLS_NAMES_FILE + ", ldr=" + ldr + ']');
-
- URL jdkClsNames = ldr.getResource(JDK_CLS_NAMES_FILE);
-
- if (jdkClsNames == null)
- throw new IgniteException("Failed to load class names properties file packaged with ignite binaries " +
- "[file=" + JDK_CLS_NAMES_FILE + ", ldr=" + ldr + ']');
-
- processResource(jdkClsNames);
-
- checkHasClassName(GridDhtPartitionFullMap.class.getName(), ldr, CLS_NAMES_FILE);
- checkHasClassName(GridDhtPartitionMap2.class.getName(), ldr, CLS_NAMES_FILE);
- checkHasClassName(HashMap.class.getName(), ldr, JDK_CLS_NAMES_FILE);
-
- if (plugins != null && !plugins.isEmpty()) {
- for (PluginProvider plugin : plugins) {
- URL pluginClsNames = ldr.getResource("META-INF/" + plugin.name().toLowerCase()
- + ".classnames.properties");
-
- if (pluginClsNames != null)
- processResource(pluginClsNames);
- }
- }
- }
- catch (IOException e) {
- throw new IllegalStateException("Failed to initialize marshaller context.", e);
- }
- }
-
- /**
- * @param clsName Class name.
- * @param ldr Class loader used to get properties file.
- * @param fileName File name.
- */
- public void checkHasClassName(String clsName, ClassLoader ldr, String fileName) {
- if (!map.containsKey(clsName.hashCode()))
- throw new IgniteException("Failed to read class name from class names properties file. " +
- "Make sure class names properties file packaged with ignite binaries is not corrupted " +
- "[clsName=" + clsName + ", fileName=" + fileName + ", ldr=" + ldr + ']');
- }
-
- /**
- * @param url Resource URL.
- * @throws IOException In case of error.
- */
- private void processResource(URL url) throws IOException {
- try (InputStream in = url.openStream()) {
- BufferedReader rdr = new BufferedReader(new InputStreamReader(in));
-
- String line;
-
- while ((line = rdr.readLine()) != null) {
- if (line.isEmpty() || line.startsWith("#"))
- continue;
-
- String clsName = line.trim();
-
- int typeId = clsName.hashCode();
-
- String oldClsName;
-
- if ((oldClsName = map.put(typeId, clsName)) != null) {
- if (!oldClsName.equals(clsName))
- throw new IgniteException("Duplicate type ID [id=" + typeId + ", clsName=" + clsName +
- ", oldClsName=" + oldClsName + ']');
- }
-
- registeredSystemTypes.add(clsName);
- }
- }
- }
-
- /** {@inheritDoc} */
- @Override public boolean registerClass(int id, Class cls) throws IgniteCheckedException {
- boolean registered = true;
-
- String clsName = map.get(id);
-
- if (clsName == null) {
- registered = registerClassName(id, cls.getName());
-
- if (registered)
- map.putIfAbsent(id, cls.getName());
- }
- else if (!clsName.equals(cls.getName()))
- throw new IgniteCheckedException("Duplicate ID [id=" + id + ", oldCls=" + clsName +
- ", newCls=" + cls.getName());
-
- return registered;
- }
-
- /** {@inheritDoc} */
- @Override public Class getClass(int id, ClassLoader ldr) throws ClassNotFoundException, IgniteCheckedException {
- String clsName = map.get(id);
-
- if (clsName == null) {
- clsName = className(id);
-
- if (clsName == null)
- throw new ClassNotFoundException("Unknown type ID: " + id);
-
- String old = map.putIfAbsent(id, clsName);
-
- if (old != null)
- clsName = old;
- }
-
- return U.forName(clsName, ldr);
- }
-
- /** {@inheritDoc} */
- @Override public boolean isSystemType(String typeName) {
- return registeredSystemTypes.contains(typeName);
- }
-
- /**
- * Registers class name.
- *
- * @param id Type ID.
- * @param clsName Class name.
- * @return Whether class name was registered.
- * @throws IgniteCheckedException In case of error.
- */
- protected abstract boolean registerClassName(int id, String clsName) throws IgniteCheckedException;
-
- /**
- * Gets class name by type ID.
- *
- * @param id Type ID.
- * @return Class name.
- * @throws IgniteCheckedException In case of error.
- */
- protected abstract String className(int id) throws IgniteCheckedException;
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/4cd332b7/modules/core/src/main/java/org/apache/ignite/internal/MarshallerContextImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/MarshallerContextImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/MarshallerContextImpl.java
index f3e368d..751a7d5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/MarshallerContextImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/MarshallerContextImpl.java
@@ -18,310 +18,564 @@
package org.apache.ignite.internal;
import java.io.BufferedReader;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileOutputStream;
import java.io.IOException;
+import java.io.InputStream;
import java.io.InputStreamReader;
-import java.io.OutputStreamWriter;
-import java.io.Writer;
-import java.nio.channels.FileChannel;
-import java.nio.channels.FileLock;
-import java.nio.channels.OverlappingFileLockException;
-import java.nio.charset.StandardCharsets;
+import java.net.URL;
+import java.util.AbstractMap;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Enumeration;
+import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ThreadLocalRandom;
-import java.util.concurrent.locks.Lock;
-import javax.cache.event.CacheEntryEvent;
-import javax.cache.event.CacheEntryListenerException;
-import javax.cache.event.CacheEntryUpdatedListener;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.ExecutorService;
import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.IgniteLogger;
-import org.apache.ignite.internal.processors.cache.CachePartialUpdateCheckedException;
-import org.apache.ignite.internal.processors.cache.GridCacheAdapter;
-import org.apache.ignite.internal.processors.cache.GridCacheTryPutFailedException;
-import org.apache.ignite.internal.util.GridStripedLock;
-import org.apache.ignite.internal.util.typedef.F;
-import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionFullMap;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionMap2;
+import org.apache.ignite.internal.processors.marshaller.MappedName;
+import org.apache.ignite.internal.processors.marshaller.MappingExchangeResult;
+import org.apache.ignite.internal.processors.marshaller.MarshallerMappingItem;
+import org.apache.ignite.internal.processors.marshaller.MarshallerMappingTransport;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.marshaller.MarshallerContext;
import org.apache.ignite.plugin.PluginProvider;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+import org.jsr166.ConcurrentHashMap8;
+
+import static org.apache.ignite.internal.MarshallerPlatformIds.JAVA_ID;
/**
* Marshaller context implementation.
*/
-public class MarshallerContextImpl extends MarshallerContextAdapter {
+public class MarshallerContextImpl implements MarshallerContext {
+ /** */
+ private static final String CLS_NAMES_FILE = "META-INF/classnames.properties";
+
/** */
- private static final GridStripedLock fileLock = new GridStripedLock(32);
+ private static final String JDK_CLS_NAMES_FILE = "META-INF/classnames-jdk.properties";
/** */
- private final CountDownLatch latch = new CountDownLatch(1);
+ private final Map<Integer, MappedName> sysTypesMap = new HashMap<>();
/** */
- private final File workDir;
+ private final Collection<String> sysTypesSet = new HashSet<>();
/** */
- private IgniteLogger log;
+ private final List<ConcurrentMap<Integer, MappedName>> allCaches = new CopyOnWriteArrayList<>();
/** */
- private volatile GridCacheAdapter<Integer, String> cache;
+ private MarshallerMappingFileStore fileStore;
- /** Non-volatile on purpose. */
- private int failedCnt;
+ /** */
+ private ExecutorService execSrvc;
+
+ /** */
+ private MarshallerMappingTransport transport;
/** */
- private ContinuousQueryListener lsnr;
+ private boolean isClientNode;
/**
- * @param igniteWorkDir Ignite work directory.
+ * Initializes context.
+ *
* @param plugins Plugins.
- * @throws IgniteCheckedException In case of error.
*/
- public MarshallerContextImpl(String igniteWorkDir, List<PluginProvider> plugins) throws IgniteCheckedException {
- super(plugins);
+ public MarshallerContextImpl(@Nullable Collection<PluginProvider> plugins) {
+ initializeCaches();
+
+ try {
+ ClassLoader ldr = U.gridClassLoader();
+
+ Enumeration<URL> urls = ldr.getResources(CLS_NAMES_FILE);
+
+ boolean foundClsNames = false;
+
+ while (urls.hasMoreElements()) {
+ processResource(urls.nextElement());
- workDir = U.resolveWorkDirectory(igniteWorkDir, "marshaller", false);
+ foundClsNames = true;
+ }
+
+ if (!foundClsNames)
+ throw new IgniteException("Failed to load class names properties file packaged with ignite binaries " +
+ "[file=" + CLS_NAMES_FILE + ", ldr=" + ldr + ']');
+
+ URL jdkClsNames = ldr.getResource(JDK_CLS_NAMES_FILE);
+
+ if (jdkClsNames == null)
+ throw new IgniteException("Failed to load class names properties file packaged with ignite binaries " +
+ "[file=" + JDK_CLS_NAMES_FILE + ", ldr=" + ldr + ']');
+
+ processResource(jdkClsNames);
+
+ checkHasClassName(GridDhtPartitionFullMap.class.getName(), ldr, CLS_NAMES_FILE);
+ checkHasClassName(GridDhtPartitionMap2.class.getName(), ldr, CLS_NAMES_FILE);
+ checkHasClassName(HashMap.class.getName(), ldr, JDK_CLS_NAMES_FILE);
+
+ if (plugins != null && !plugins.isEmpty()) {
+ for (PluginProvider plugin : plugins) {
+ URL pluginClsNames = ldr.getResource("META-INF/" + plugin.name().toLowerCase()
+ + ".classnames.properties");
+
+ if (pluginClsNames != null)
+ processResource(pluginClsNames);
+ }
+ }
+ }
+ catch (IOException e) {
+ throw new IllegalStateException("Failed to initialize marshaller context.", e);
+ }
+ }
+
+ /** */
+ private void initializeCaches() {
+ allCaches.add(new CombinedMap(new ConcurrentHashMap8<Integer, MappedName>(), sysTypesMap));
+ }
+
+ /** */
+ public ArrayList<Map<Integer, MappedName>> getCachedMappings() {
+ ArrayList<Map<Integer, MappedName>> result = new ArrayList<>(allCaches.size());
+
+ for (int i = 0; i < allCaches.size(); i++) {
+ Map res;
+
+ if (i == JAVA_ID)
+ res = ((CombinedMap) allCaches.get(JAVA_ID)).userMap;
+ else
+ res = allCaches.get(i);
+
+ if (!res.isEmpty())
+ result.add(res);
+ }
+
+ return result;
}
/**
- * @param ctx Context.
- * @throws IgniteCheckedException If failed.
+ * @param platformId Platform id.
+ * @param marshallerMapping Marshaller mapping.
*/
- public void onContinuousProcessorStarted(GridKernalContext ctx) throws IgniteCheckedException {
- if (ctx.clientNode()) {
- lsnr = new ContinuousQueryListener(ctx.log(MarshallerContextImpl.class), workDir);
-
- ctx.continuous().registerStaticRoutine(
- CU.MARSH_CACHE_NAME,
- lsnr,
- null,
- null);
- }
+ public void onMappingDataReceived(byte platformId, Map<Integer, MappedName> marshallerMapping) {
+ ConcurrentMap<Integer, MappedName> platformCache = getCacheFor(platformId);
+
+ for (Map.Entry<Integer, MappedName> e : marshallerMapping.entrySet())
+ platformCache.put(e.getKey(), new MappedName(e.getValue().className(), true));
}
/**
- * @param ctx Kernal context.
- * @throws IgniteCheckedException In case of error.
+ * @param clsName Class name.
+ * @param ldr Class loader used to get properties file.
+ * @param fileName File name.
*/
- public void onMarshallerCacheStarted(GridKernalContext ctx) throws IgniteCheckedException {
- assert ctx != null;
+ private void checkHasClassName(String clsName, ClassLoader ldr, String fileName) {
+ ConcurrentMap cache = getCacheFor(JAVA_ID);
+
+ if (!cache.containsKey(clsName.hashCode()))
+ throw new IgniteException("Failed to read class name from class names properties file. " +
+ "Make sure class names properties file packaged with ignite binaries is not corrupted " +
+ "[clsName=" + clsName + ", fileName=" + fileName + ", ldr=" + ldr + ']');
+ }
+
+ /**
+ * @param url Resource URL.
+ * @throws IOException In case of error.
+ */
+ private void processResource(URL url) throws IOException {
+ try (InputStream in = url.openStream()) {
+ BufferedReader rdr = new BufferedReader(new InputStreamReader(in));
+
+ String line;
+
+ while ((line = rdr.readLine()) != null) {
+ if (line.isEmpty() || line.startsWith("#"))
+ continue;
- log = ctx.log(MarshallerContextImpl.class);
+ String clsName = line.trim();
- cache = ctx.cache().marshallerCache();
+ int typeId = clsName.hashCode();
- if (ctx.cache().marshallerCache().context().affinityNode()) {
- ctx.cache().marshallerCache().context().continuousQueries().executeInternalQuery(
- new ContinuousQueryListener(log, workDir),
- null,
- true,
- true,
- false
- );
+ MappedName oldClsName;
+
+ if ((oldClsName = sysTypesMap.put(typeId, new MappedName(clsName, true))) != null) {
+ if (!oldClsName.className().equals(clsName))
+ throw new IgniteException(
+ "Duplicate type ID [id="
+ + typeId
+ + ", oldClsName="
+ + oldClsName
+ + ", clsName="
+ + clsName + ']');
+ }
+
+ sysTypesSet.add(clsName);
+ }
}
- else {
- if (lsnr != null) {
- ctx.closure().runLocalSafe(new Runnable() {
- @SuppressWarnings("unchecked")
- @Override public void run() {
- try {
- Iterable entries = cache.context().continuousQueries().existingEntries(false, null);
-
- lsnr.onUpdated(entries);
- }
- catch (IgniteCheckedException e) {
- U.error(log, "Failed to load marshaller cache entries: " + e, e);
- }
- }
- });
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean registerClassName(
+ byte platformId,
+ int typeId,
+ String clsName
+ ) throws IgniteCheckedException {
+ ConcurrentMap<Integer, MappedName> cache = getCacheFor(platformId);
+
+ MappedName mappedName = cache.get(typeId);
+
+ if (mappedName != null) {
+ if (!mappedName.className().equals(clsName))
+ throw duplicateIdException(platformId, typeId, mappedName.className(), clsName);
+ else {
+ if (mappedName.accepted())
+ return true;
+
+ if (transport.stopping())
+ return false;
+
+ IgniteInternalFuture<MappingExchangeResult> fut = transport.awaitMappingAcceptance(new MarshallerMappingItem(platformId, typeId, clsName), cache);
+ MappingExchangeResult res = fut.get();
+
+ return convertXchRes(res);
}
}
+ else {
+ if (transport.stopping())
+ return false;
- latch.countDown();
+ IgniteInternalFuture<MappingExchangeResult> fut = transport.proposeMapping(new MarshallerMappingItem(platformId, typeId, clsName), cache);
+ MappingExchangeResult res = fut.get();
+
+ return convertXchRes(res);
+ }
+ }
+
+ /**
+ * @param res result of exchange.
+ */
+ private boolean convertXchRes(MappingExchangeResult res) throws IgniteCheckedException {
+ if (res.successful())
+ return true;
+ else if (res.exchangeDisabled())
+ return false;
+ else {
+ assert res.error() != null;
+ throw res.error();
+ }
}
/**
- * Release marshaller context.
+ * @param platformId Platform id.
+ * @param typeId Type id.
+ * @param conflictingClsName Conflicting class name.
+ * @param clsName Class name.
*/
- public void onKernalStop() {
- latch.countDown();
+ private IgniteCheckedException duplicateIdException(
+ byte platformId,
+ int typeId,
+ String conflictingClsName,
+ String clsName
+ ) {
+ return new IgniteCheckedException("Duplicate ID [platformId="
+ + platformId
+ + ", typeId="
+ + typeId
+ + ", oldCls="
+ + conflictingClsName
+ + ", newCls="
+ + clsName + "]");
+ }
+
+ /**
+ *
+ * @param item type mapping to propose
+ * @return false if there is a conflict with another mapping in local cache, true otherwise.
+ */
+ public String onMappingProposed(MarshallerMappingItem item) {
+ ConcurrentMap<Integer, MappedName> cache = getCacheFor(item.platformId());
+
+ MappedName newName = new MappedName(item.className(), false);
+ MappedName oldName;
+
+ if ((oldName = cache.putIfAbsent(item.typeId(), newName)) == null)
+ return null;
+ else
+ return oldName.className();
+ }
+
+ /**
+ * @param item Item.
+ */
+ public void onMappingAccepted(final MarshallerMappingItem item) {
+ ConcurrentMap<Integer, MappedName> cache = getCacheFor(item.platformId());
+
+ cache.replace(item.typeId(), new MappedName(item.className(), true));
+
+ execSrvc.submit(new MappingStoreTask(fileStore, item.platformId(), item.typeId(), item.className()));
}
/** {@inheritDoc} */
- @Override protected boolean registerClassName(int id, String clsName) throws IgniteCheckedException {
- GridCacheAdapter<Integer, String> cache0 = cache;
+ @Override public Class getClass(int typeId, ClassLoader ldr) throws ClassNotFoundException, IgniteCheckedException {
+ String clsName = getClassName(JAVA_ID, typeId);
- if (cache0 == null)
- return false;
+ if (clsName == null)
+ throw new ClassNotFoundException("Unknown type ID: " + typeId);
- String old;
+ return U.forName(clsName, ldr);
+ }
- try {
- old = cache0.tryGetAndPut(id, clsName);
+ /** {@inheritDoc} */
+ @Override public String getClassName(
+ byte platformId,
+ int typeId
+ ) throws ClassNotFoundException, IgniteCheckedException {
+ ConcurrentMap<Integer, MappedName> cache = getCacheFor(platformId);
- if (old != null && !old.equals(clsName))
- throw new IgniteCheckedException("Type ID collision detected [id=" + id + ", clsName1=" + clsName +
- ", clsName2=" + old + ']');
+ MappedName mappedName = cache.get(typeId);
- failedCnt = 0;
+ String clsName;
- return true;
- }
- catch (CachePartialUpdateCheckedException | GridCacheTryPutFailedException e) {
- if (++failedCnt > 10) {
- if (log.isQuiet())
- U.quiet(false, "Failed to register marshalled class for more than 10 times in a row " +
- "(may affect performance).");
+ if (mappedName != null)
+ clsName = mappedName.className();
+ else {
+ clsName = fileStore.readMapping(platformId, typeId);
- failedCnt = 0;
- }
+ if (clsName != null)
+ cache.putIfAbsent(typeId, new MappedName(clsName, true));
+ else
+ if (isClientNode) {
+ mappedName = cache.get(typeId);
- return false;
+ if (mappedName == null) {
+ GridFutureAdapter<MappingExchangeResult> fut = transport.requestMapping(
+ new MarshallerMappingItem(platformId, typeId, null),
+ cache);
+
+ clsName = fut.get().className();
+ }
+ else
+ clsName = mappedName.className();
+
+ if (clsName == null)
+ throw new ClassNotFoundException(
+ "Requesting mapping from grid failed for [platformId="
+ + platformId
+ + ", typeId="
+ + typeId + "]");
+
+ return clsName;
+ }
+ else
+ throw new ClassNotFoundException(
+ "Unknown pair [platformId= "
+ + platformId
+ + ", typeId="
+ + typeId + "]");
}
+
+ return clsName;
}
- /** {@inheritDoc} */
- @Override public String className(int id) throws IgniteCheckedException {
- GridCacheAdapter<Integer, String> cache0 = cache;
+ /**
+ * @param platformId Platform id.
+ * @param typeId Type id.
+ */
+ public String resolveMissedMapping(byte platformId, int typeId) {
+ ConcurrentMap<Integer, MappedName> cache = getCacheFor(platformId);
- if (cache0 == null) {
- U.awaitQuiet(latch);
+ MappedName mappedName = cache.get(typeId);
- cache0 = cache;
+ if (mappedName != null) {
+ assert mappedName.accepted() : mappedName;
- if (cache0 == null)
- throw new IllegalStateException("Failed to initialize marshaller context (grid is stopping).");
+ return mappedName.className();
}
- String clsName = cache0.getTopologySafe(id);
+ return null;
+ }
+
+ /**
+ * @param item Item.
+ * @param resolvedClsName Resolved class name.
+ */
+ public void onMissedMappingResolved(MarshallerMappingItem item, String resolvedClsName) {
+ ConcurrentMap<Integer, MappedName> cache = getCacheFor(item.platformId());
+
+ int typeId = item.typeId();
+ MappedName mappedName = cache.get(typeId);
+
+ if (mappedName != null)
+ assert resolvedClsName.equals(mappedName.className()) :
+ "Class name resolved from cluster: "
+ + resolvedClsName
+ + ", class name from local cache: "
+ + mappedName.className();
+ else {
+ mappedName = new MappedName(resolvedClsName, true);
+ cache.putIfAbsent(typeId, mappedName);
+
+ execSrvc.submit(new MappingStoreTask(fileStore, item.platformId(), item.typeId(), resolvedClsName));
+ }
+ }
- if (clsName == null) {
- String fileName = id + ".classname";
+ /** {@inheritDoc} */
+ @Override public boolean isSystemType(String typeName) {
+ return sysTypesSet.contains(typeName);
+ }
- Lock lock = fileLock(fileName);
+ /**
+ * @param platformId Platform id.
+ */
+ private ConcurrentMap<Integer, MappedName> getCacheFor(byte platformId) {
+ ConcurrentMap<Integer, MappedName> map;
- lock.lock();
+ if (platformId < allCaches.size()) {
+ map = allCaches.get(platformId);
- try {
- File file = new File(workDir, fileName);
+ if (map != null)
+ return map;
+ }
- try (FileInputStream in = new FileInputStream(file)) {
- FileLock fileLock = fileLock(in.getChannel(), true);
+ synchronized (this) {
+ int size = allCaches.size();
- assert fileLock != null : fileName;
+ if (platformId < size) {
+ map = allCaches.get(platformId);
- try (BufferedReader reader = new BufferedReader(new InputStreamReader(in, StandardCharsets.UTF_8))) {
- clsName = reader.readLine();
- }
+ if (map == null) {
+ map = new ConcurrentHashMap8<>();
+ allCaches.set(platformId, map);
}
- catch (IOException e) {
- throw new IgniteCheckedException("Class definition was not found " +
- "at marshaller cache and local file. " +
- "[id=" + id + ", file=" + file.getAbsolutePath() + ']');
- }
- }
- finally {
- lock.unlock();
}
+ else {
+ map = new ConcurrentHashMap8<>();
- // Must explicitly put entry to cache to invoke other continuous queries.
- registerClassName(id, clsName);
+ putAtIndex(map, allCaches, platformId, size);
+ }
}
- return clsName;
+ return map;
}
/**
- * @param fileName File name.
- * @return Lock instance.
+ * @param map Map.
+ * @param allCaches All caches.
+ * @param targetIdx Target index.
+ * @param size Size.
*/
- private static Lock fileLock(String fileName) {
- return fileLock.getLock(fileName.hashCode());
+ private static void putAtIndex(
+ ConcurrentMap<Integer, MappedName> map,
+ Collection<ConcurrentMap<Integer, MappedName>> allCaches,
+ byte targetIdx,
+ int size
+ ) {
+ int lastIdx = size - 1;
+
+ int nullElemsToAdd = targetIdx - lastIdx - 1;
+
+ for (int i = 0; i < nullElemsToAdd; i++)
+ allCaches.add(null);
+
+ allCaches.add(map);
}
/**
- * @param ch File channel.
- * @param shared Shared.
+ * @param ctx Context.
+ * @param transport Transport.
*/
- private static FileLock fileLock(
- FileChannel ch,
- boolean shared
- ) throws IOException, IgniteInterruptedCheckedException {
- ThreadLocalRandom rnd = ThreadLocalRandom.current();
+ public void onMarshallerProcessorStarted(
+ GridKernalContext ctx,
+ MarshallerMappingTransport transport
+ ) throws IgniteCheckedException {
+ assert ctx != null;
- while (true) {
- FileLock fileLock = ch.tryLock(0L, Long.MAX_VALUE, shared);
+ IgniteConfiguration cfg = ctx.config();
+ String workDir = U.workDirectory(cfg.getWorkDirectory(), cfg.getIgniteHome());
- if (fileLock == null)
- U.sleep(rnd.nextLong(50));
- else
- return fileLock;
- }
+ fileStore = new MarshallerMappingFileStore(workDir, ctx.log(MarshallerMappingFileStore.class));
+ this.transport = transport;
+ execSrvc = ctx.getSystemExecutorService();
+ isClientNode = ctx.clientNode();
+ }
+
+ /**
+ *
+ */
+ public void onMarshallerProcessorStop() {
+ transport.markStopping();
}
/**
+ *
*/
- public static class ContinuousQueryListener implements CacheEntryUpdatedListener<Integer, String> {
+ static final class CombinedMap extends AbstractMap<Integer, MappedName>
+ implements ConcurrentMap<Integer, MappedName> {
/** */
- private final IgniteLogger log;
+ private final ConcurrentMap<Integer, MappedName> userMap;
/** */
- private final File workDir;
+ private final Map<Integer, MappedName> sysMap;
/**
- * @param log Logger.
- * @param workDir Work directory.
+ * @param userMap User map.
+ * @param sysMap System map.
*/
- public ContinuousQueryListener(IgniteLogger log, File workDir) {
- this.log = log;
- this.workDir = workDir;
+ CombinedMap(ConcurrentMap<Integer, MappedName> userMap, Map<Integer, MappedName> sysMap) {
+ this.userMap = userMap;
+ this.sysMap = sysMap;
}
/** {@inheritDoc} */
- @Override public void onUpdated(Iterable<CacheEntryEvent<? extends Integer, ? extends String>> evts)
- throws CacheEntryListenerException {
- for (CacheEntryEvent<? extends Integer, ? extends String> evt : evts) {
- assert evt.getOldValue() == null || F.eq(evt.getOldValue(), evt.getValue()):
- "Received cache entry update for system marshaller cache: " + evt;
-
- if (evt.getOldValue() == null) {
- String fileName = evt.getKey() + ".classname";
-
- Lock lock = fileLock(fileName);
-
- lock.lock();
-
- try {
- File file = new File(workDir, fileName);
-
- try (FileOutputStream out = new FileOutputStream(file)) {
- FileLock fileLock = fileLock(out.getChannel(), false);
-
- assert fileLock != null : fileName;
-
- try (Writer writer = new OutputStreamWriter(out, StandardCharsets.UTF_8)) {
- writer.write(evt.getValue());
-
- writer.flush();
- }
- }
- catch (IOException e) {
- U.error(log, "Failed to write class name to file [id=" + evt.getKey() +
- ", clsName=" + evt.getValue() + ", file=" + file.getAbsolutePath() + ']', e);
- }
- catch(OverlappingFileLockException ignored) {
- if (log.isDebugEnabled())
- log.debug("File already locked (will ignore): " + file.getAbsolutePath());
- }
- catch (IgniteInterruptedCheckedException e) {
- U.error(log, "Interrupted while waiting for acquiring file lock: " + file, e);
- }
- }
- finally {
- lock.unlock();
- }
- }
- }
+ @Override public Set<Entry<Integer, MappedName>> entrySet() {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public MappedName putIfAbsent(@NotNull Integer key, MappedName val) {
+ return userMap.putIfAbsent(key, val);
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean remove(@NotNull Object key, Object val) {
+ return false;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean replace(@NotNull Integer key, @NotNull MappedName oldVal, @NotNull MappedName newVal) {
+ return false;
+ }
+
+ /** {@inheritDoc} */
+ @Override public MappedName replace(@NotNull Integer key, @NotNull MappedName val) {
+ return userMap.replace(key, val);
+ }
+
+ /** {@inheritDoc} */
+ @Override public MappedName get(Object key) {
+ MappedName res = sysMap.get(key);
+
+ if (res != null)
+ return res;
+
+ return userMap.get(key);
+ }
+
+ /** {@inheritDoc} */
+ @Override public MappedName put(Integer key, MappedName val) {
+ return userMap.put(key, val);
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean containsKey(Object key) {
+ return userMap.containsKey(key) || sysMap.containsKey(key);
}
}
-}
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/4cd332b7/modules/core/src/main/java/org/apache/ignite/internal/MarshallerMappingFileStore.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/MarshallerMappingFileStore.java b/modules/core/src/main/java/org/apache/ignite/internal/MarshallerMappingFileStore.java
new file mode 100644
index 0000000..03f79c9
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/MarshallerMappingFileStore.java
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.internal;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.OutputStreamWriter;
+import java.io.Writer;
+import java.nio.channels.FileChannel;
+import java.nio.channels.FileLock;
+import java.nio.channels.OverlappingFileLockException;
+import java.nio.charset.StandardCharsets;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.locks.Lock;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.internal.util.GridStripedLock;
+import org.apache.ignite.internal.util.typedef.internal.U;
+
+/**
+ * File-based persistence provider for {@link MarshallerContextImpl}.
+ *
+ * Saves mappings in format <b>{typeId}.classname{platformId}</b>, e.g. 123.classname0.
+ *
+ * It writes new mapping when it is accepted by all grid members and reads mapping
+ * when a classname is requested but is not presented in local cache of {@link MarshallerContextImpl}.
+ */
+final class MarshallerMappingFileStore {
+ /** */
+ private static final GridStripedLock fileLock = new GridStripedLock(32);
+
+ /** */
+ private final IgniteLogger log;
+
+ /** */
+ private final File workDir;
+
+ /**
+ * @param log Logger.
+ */
+ MarshallerMappingFileStore(String igniteWorkDir, IgniteLogger log) throws IgniteCheckedException {
+ workDir = U.resolveWorkDirectory(igniteWorkDir, "marshaller", false);
+ this.log = log;
+ }
+
+ /**
+ * @param platformId Platform id.
+ * @param typeId Type id.
+ * @param typeName Type name.
+ */
+ void writeMapping(byte platformId, int typeId, String typeName) {
+ String fileName = getFileName(platformId, typeId);
+
+ Lock lock = fileLock(fileName);
+
+ lock.lock();
+
+ try {
+ File file = new File(workDir, fileName);
+
+ try (FileOutputStream out = new FileOutputStream(file)) {
+ FileLock fileLock = fileLock(out.getChannel(), false);
+
+ assert fileLock != null : fileName;
+
+ try (Writer writer = new OutputStreamWriter(out, StandardCharsets.UTF_8)) {
+ writer.write(typeName);
+
+ writer.flush();
+ }
+ }
+ catch (IOException e) {
+ U.error(log, "Failed to write class name to file [platformId=" + platformId + "id=" + typeId +
+ ", clsName=" + typeName + ", file=" + file.getAbsolutePath() + ']', e);
+ }
+ catch(OverlappingFileLockException ignored) {
+ if (log.isDebugEnabled())
+ log.debug("File already locked (will ignore): " + file.getAbsolutePath());
+ }
+ catch (IgniteInterruptedCheckedException e) {
+ U.error(log, "Interrupted while waiting for acquiring file lock: " + file, e);
+ }
+ }
+ finally {
+ lock.unlock();
+ }
+ }
+
+ /**
+ * @param platformId Platform id.
+ * @param typeId Type id.
+ */
+ String readMapping(byte platformId, int typeId) throws IgniteCheckedException {
+ String fileName = getFileName(platformId, typeId);
+
+ Lock lock = fileLock(fileName);
+
+ lock.lock();
+
+ try {
+ File file = new File(workDir, fileName);
+
+ try (FileInputStream in = new FileInputStream(file)) {
+ FileLock fileLock = fileLock(in.getChannel(), true);
+
+ assert fileLock != null : fileName;
+
+ try (BufferedReader reader = new BufferedReader(new InputStreamReader(in, StandardCharsets.UTF_8))) {
+ return reader.readLine();
+ }
+ }
+ catch (IOException ignored) {
+ return null;
+ }
+ }
+ finally {
+ lock.unlock();
+ }
+ }
+
+ /**
+ * @param platformId Platform id.
+ * @param typeId Type id.
+ */
+ private String getFileName(byte platformId, int typeId) {
+ return typeId + ".classname" + platformId;
+ }
+
+ /**
+ * @param fileName File name.
+ * @return Lock instance.
+ */
+ private static Lock fileLock(String fileName) {
+ return fileLock.getLock(fileName.hashCode());
+ }
+
+ /**
+ * @param ch File channel.
+ * @param shared Shared.
+ */
+ private static FileLock fileLock(
+ FileChannel ch,
+ boolean shared
+ ) throws IOException, IgniteInterruptedCheckedException {
+ ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+ while (true) {
+ FileLock fileLock = ch.tryLock(0L, Long.MAX_VALUE, shared);
+
+ if (fileLock == null)
+ U.sleep(rnd.nextLong(50));
+ else
+ return fileLock;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/4cd332b7/modules/core/src/main/java/org/apache/ignite/internal/MarshallerPlatformIds.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/MarshallerPlatformIds.java b/modules/core/src/main/java/org/apache/ignite/internal/MarshallerPlatformIds.java
new file mode 100644
index 0000000..458ae49
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/MarshallerPlatformIds.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal;
+
+/**
+ * Constants for platform IDs to feed into {@link org.apache.ignite.marshaller.MarshallerContext}.
+ */
+public final class MarshallerPlatformIds {
+ /** */
+ public static final byte JAVA_ID = 0;
+
+ /** */
+ private MarshallerPlatformIds() {
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/4cd332b7/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryContext.java b/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryContext.java
index 2237c27..b291872 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryContext.java
@@ -118,6 +118,8 @@ import java.util.concurrent.ConcurrentMap;
import java.util.jar.JarEntry;
import java.util.jar.JarFile;
+import static org.apache.ignite.internal.MarshallerPlatformIds.JAVA_ID;
+
/**
* Binary context.
*/
@@ -764,7 +766,7 @@ public class BinaryContext {
final int typeId = mapper.typeId(clsName);
try {
- registered = marshCtx.registerClass(typeId, cls);
+ registered = marshCtx.registerClassName(JAVA_ID, typeId, cls.getName());
}
catch (IgniteCheckedException e) {
throw new BinaryObjectException("Failed to register class.", e);
@@ -807,7 +809,7 @@ public class BinaryContext {
boolean registered;
try {
- registered = marshCtx.registerClass(desc.typeId(), desc.describedClass());
+ registered = marshCtx.registerClassName(JAVA_ID, desc.typeId(), desc.describedClass().getName());
}
catch (IgniteCheckedException e) {
throw new BinaryObjectException("Failed to register class.", e);
@@ -844,7 +846,7 @@ public class BinaryContext {
* @param cls Class.
* @return Serializer for class or {@code null} if none exists.
*/
- private @Nullable BinarySerializer serializerForClass(Class cls) {
+ @Nullable private BinarySerializer serializerForClass(Class cls) {
BinarySerializer serializer = defaultSerializer();
if (serializer == null && canUseReflectiveSerializer(cls))
http://git-wip-us.apache.org/repos/asf/ignite/blob/4cd332b7/modules/core/src/main/java/org/apache/ignite/internal/client/marshaller/optimized/GridClientOptimizedMarshaller.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/client/marshaller/optimized/GridClientOptimizedMarshaller.java b/modules/core/src/main/java/org/apache/ignite/internal/client/marshaller/optimized/GridClientOptimizedMarshaller.java
index 3c65db6..6a4b5e8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/client/marshaller/optimized/GridClientOptimizedMarshaller.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/client/marshaller/optimized/GridClientOptimizedMarshaller.java
@@ -22,7 +22,7 @@ import java.nio.ByteBuffer;
import java.util.List;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
-import org.apache.ignite.internal.MarshallerContextAdapter;
+import org.apache.ignite.internal.MarshallerContextImpl;
import org.apache.ignite.internal.client.marshaller.GridClientMarshaller;
import org.apache.ignite.internal.processors.rest.client.message.GridClientMessage;
import org.apache.ignite.internal.util.typedef.internal.U;
@@ -31,7 +31,7 @@ import org.apache.ignite.plugin.PluginProvider;
import org.jetbrains.annotations.Nullable;
/**
- * Wrapper, that adapts {@link org.apache.ignite.marshaller.optimized.OptimizedMarshaller} to
+ * Wrapper, that adapts {@link OptimizedMarshaller} to
* {@link GridClientMarshaller} interface.
*/
public class GridClientOptimizedMarshaller implements GridClientMarshaller {
@@ -114,7 +114,7 @@ public class GridClientOptimizedMarshaller implements GridClientMarshaller {
/**
*/
- private static class ClientMarshallerContext extends MarshallerContextAdapter {
+ private static class ClientMarshallerContext extends MarshallerContextImpl {
/** */
public ClientMarshallerContext() {
super(null);
@@ -126,15 +126,5 @@ public class GridClientOptimizedMarshaller implements GridClientMarshaller {
public ClientMarshallerContext(@Nullable List<PluginProvider> plugins) {
super(plugins);
}
-
- /** {@inheritDoc} */
- @Override protected boolean registerClassName(int id, String clsName) {
- throw new UnsupportedOperationException(clsName);
- }
-
- /** {@inheritDoc} */
- @Override protected String className(int id) {
- throw new UnsupportedOperationException();
- }
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/4cd332b7/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java
index 584cc56..59af748 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java
@@ -59,6 +59,9 @@ import org.apache.ignite.spi.IgniteSpiContext;
import org.apache.ignite.spi.IgniteSpiException;
import org.apache.ignite.spi.IgniteSpiNoop;
import org.apache.ignite.spi.IgniteSpiTimeoutObject;
+import org.apache.ignite.spi.discovery.DiscoveryDataBag;
+import org.apache.ignite.spi.discovery.DiscoveryDataBag.GridDiscoveryData;
+import org.apache.ignite.spi.discovery.DiscoveryDataBag.JoiningNodeDiscoveryData;
import org.jetbrains.annotations.Nullable;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
@@ -604,12 +607,22 @@ public abstract class GridManagerAdapter<T extends IgniteSpi> implements GridMan
}
/** {@inheritDoc} */
- @Override @Nullable public Serializable collectDiscoveryData(UUID nodeId) {
- return null;
+ @Override public void collectJoiningNodeData(DiscoveryDataBag dataBag) {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public void collectGridNodeData(DiscoveryDataBag dataBag) {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onGridDataReceived(GridDiscoveryData data) {
+ // No-op.
}
/** {@inheritDoc} */
- @Override public void onDiscoveryDataReceived(UUID joiningNodeId, UUID rmtNodeId, Serializable data) {
+ @Override public void onJoiningNodeDataReceived(JoiningNodeDiscoveryData data) {
// No-op.
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/4cd332b7/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
index de34adb..16ea972 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
@@ -94,7 +94,6 @@ import static org.apache.ignite.internal.managers.communication.GridIoPolicy.DAT
import static org.apache.ignite.internal.managers.communication.GridIoPolicy.IDX_POOL;
import static org.apache.ignite.internal.managers.communication.GridIoPolicy.IGFS_POOL;
import static org.apache.ignite.internal.managers.communication.GridIoPolicy.MANAGEMENT_POOL;
-import static org.apache.ignite.internal.managers.communication.GridIoPolicy.MARSH_CACHE_POOL;
import static org.apache.ignite.internal.managers.communication.GridIoPolicy.P2P_POOL;
import static org.apache.ignite.internal.managers.communication.GridIoPolicy.PUBLIC_POOL;
import static org.apache.ignite.internal.managers.communication.GridIoPolicy.SYSTEM_POOL;
@@ -684,7 +683,6 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
case MANAGEMENT_POOL:
case AFFINITY_POOL:
case UTILITY_CACHE_POOL:
- case MARSH_CACHE_POOL:
case IDX_POOL:
case IGFS_POOL:
case DATA_STREAMER_POOL:
http://git-wip-us.apache.org/repos/asf/ignite/blob/4cd332b7/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
index b1fe910..e283bdc 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
@@ -131,6 +131,8 @@ import org.apache.ignite.internal.processors.igfs.IgfsFileAffinityRange;
import org.apache.ignite.internal.processors.igfs.IgfsFragmentizerRequest;
import org.apache.ignite.internal.processors.igfs.IgfsFragmentizerResponse;
import org.apache.ignite.internal.processors.igfs.IgfsSyncMessage;
+import org.apache.ignite.internal.processors.marshaller.MissingMappingRequestMessage;
+import org.apache.ignite.internal.processors.marshaller.MissingMappingResponseMessage;
import org.apache.ignite.internal.processors.query.h2.twostep.messages.GridQueryCancelRequest;
import org.apache.ignite.internal.processors.query.h2.twostep.messages.GridQueryFailResponse;
import org.apache.ignite.internal.processors.query.h2.twostep.messages.GridQueryNextPageRequest;
@@ -806,6 +808,16 @@ public class GridIoMessageFactory implements MessageFactory {
break;
+ case 120:
+ msg = new MissingMappingRequestMessage();
+
+ break;
+
+ case 121:
+ msg = new MissingMappingResponseMessage();
+
+ break;
+
case 124:
msg = new GridMessageCollection<>();