You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2016/09/07 11:16:57 UTC
[12/50] [abbrv] ignite git commit: IGNITE-2649: Ensured correct local
Ignite instance processing during serialization and deserialization.
http://git-wip-us.apache.org/repos/asf/ignite/blob/e5f3abd2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
index 6194552..0498c20 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
@@ -28,7 +28,9 @@ import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.marshaller.MarshallerUtils;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.plugin.extensions.communication.MessageReader;
import org.apache.ignite.plugin.extensions.communication.MessageWriter;
@@ -133,10 +135,10 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
super.prepareMarshal(ctx);
if (parts != null && partsBytes == null)
- partsBytes = ctx.marshaller().marshal(parts);
+ partsBytes = CU.marshal(ctx, parts);
if (partCntrs != null)
- partCntrsBytes = ctx.marshaller().marshal(partCntrs);
+ partCntrsBytes = CU.marshal(ctx, partCntrs);
}
/**
@@ -157,14 +159,18 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
@Override public void finishUnmarshal(GridCacheSharedContext ctx, ClassLoader ldr) throws IgniteCheckedException {
super.finishUnmarshal(ctx, ldr);
- if (partsBytes != null && parts == null)
- parts = ctx.marshaller().unmarshal(partsBytes, U.resolveClassLoader(ldr, ctx.gridConfig()));
+ if (partsBytes != null && parts == null) {
+ parts = MarshallerUtils.unmarshal(ctx.gridName(), ctx.marshaller(), partsBytes,
+ U.resolveClassLoader(ldr, ctx.gridConfig()));
+ }
if (parts == null)
parts = new HashMap<>();
- if (partCntrsBytes != null && partCntrs == null)
- partCntrs = ctx.marshaller().unmarshal(partCntrsBytes, U.resolveClassLoader(ldr, ctx.gridConfig()));
+ if (partCntrsBytes != null && partCntrs == null) {
+ partCntrs = MarshallerUtils.unmarshal(ctx.gridName(), ctx.marshaller(), partCntrsBytes,
+ U.resolveClassLoader(ldr, ctx.gridConfig()));
+ }
if (partCntrs == null)
partCntrs = new HashMap<>();
http://git-wip-us.apache.org/repos/asf/ignite/blob/e5f3abd2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java
index 0fe4259..38307e1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java
@@ -27,7 +27,9 @@ import org.apache.ignite.internal.GridDirectTransient;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.marshaller.MarshallerUtils;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.plugin.extensions.communication.MessageReader;
import org.apache.ignite.plugin.extensions.communication.MessageWriter;
@@ -137,21 +139,25 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes
super.prepareMarshal(ctx);
if (partsBytes == null && parts != null)
- partsBytes = ctx.marshaller().marshal(parts);
+ partsBytes = CU.marshal(ctx, parts);
if (partCntrs != null)
- partCntrsBytes = ctx.marshaller().marshal(partCntrs);
+ partCntrsBytes = CU.marshal(ctx, partCntrs);
}
/** {@inheritDoc} */
@Override public void finishUnmarshal(GridCacheSharedContext ctx, ClassLoader ldr) throws IgniteCheckedException {
super.finishUnmarshal(ctx, ldr);
- if (partsBytes != null && parts == null)
- parts = ctx.marshaller().unmarshal(partsBytes, U.resolveClassLoader(ldr, ctx.gridConfig()));
+ if (partsBytes != null && parts == null) {
+ parts = MarshallerUtils.unmarshal(ctx.gridName(), ctx.marshaller(), partsBytes,
+ U.resolveClassLoader(ldr, ctx.gridConfig()));
+ }
- if (partCntrsBytes != null)
- partCntrs = ctx.marshaller().unmarshal(partCntrsBytes, U.resolveClassLoader(ldr, ctx.gridConfig()));
+ if (partCntrsBytes != null) {
+ partCntrs = MarshallerUtils.unmarshal(ctx.gridName(), ctx.marshaller(), partCntrsBytes,
+ U.resolveClassLoader(ldr, ctx.gridConfig()));
+ }
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/e5f3abd2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetResponse.java
index be92551..03ddf43 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetResponse.java
@@ -34,9 +34,11 @@ import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersionable;
import org.apache.ignite.internal.util.GridLeanSet;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteUuid;
+import org.apache.ignite.marshaller.MarshallerUtils;
import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType;
import org.apache.ignite.plugin.extensions.communication.MessageReader;
import org.apache.ignite.plugin.extensions.communication.MessageWriter;
@@ -190,7 +192,7 @@ public class GridNearGetResponse extends GridCacheMessage implements GridCacheDe
}
if (err != null)
- errBytes = ctx.marshaller().marshal(err);
+ errBytes = CU.marshal(ctx, err);
}
/** {@inheritDoc} */
@@ -204,8 +206,10 @@ public class GridNearGetResponse extends GridCacheMessage implements GridCacheDe
info.unmarshal(cctx, ldr);
}
- if (errBytes != null)
- err = ctx.marshaller().unmarshal(errBytes, U.resolveClassLoader(ldr, ctx.gridConfig()));
+ if (errBytes != null) {
+ err = MarshallerUtils.unmarshal(ctx.gridName(), ctx.marshaller(), errBytes,
+ U.resolveClassLoader(ldr, ctx.gridConfig()));
+ }
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/e5f3abd2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearSingleGetResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearSingleGetResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearSingleGetResponse.java
index e8d164f..19b2163 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearSingleGetResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearSingleGetResponse.java
@@ -27,7 +27,9 @@ import org.apache.ignite.internal.processors.cache.GridCacheDeployable;
import org.apache.ignite.internal.processors.cache.GridCacheEntryInfo;
import org.apache.ignite.internal.processors.cache.GridCacheMessage;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
+import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.marshaller.MarshallerUtils;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.plugin.extensions.communication.Message;
import org.apache.ignite.plugin.extensions.communication.MessageReader;
@@ -169,7 +171,7 @@ public class GridNearSingleGetResponse extends GridCacheMessage implements GridC
}
if (err != null)
- errBytes = ctx.marshaller().marshal(err);
+ errBytes = CU.marshal(ctx, err);
}
/** {@inheritDoc} */
@@ -187,8 +189,10 @@ public class GridNearSingleGetResponse extends GridCacheMessage implements GridC
((GridCacheEntryInfo)res).unmarshal(cctx, ldr);
}
- if (errBytes != null && err == null)
- err = ctx.marshaller().unmarshal(errBytes, U.resolveClassLoader(ldr, ctx.gridConfig()));
+ if (errBytes != null && err == null) {
+ err = MarshallerUtils.unmarshal(ctx.gridName(), ctx.marshaller(), errBytes,
+ U.resolveClassLoader(ldr, ctx.gridConfig()));
+ }
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/e5f3abd2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishResponse.java
index 4055b2a..e01eb9d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishResponse.java
@@ -24,9 +24,11 @@ import org.apache.ignite.internal.GridDirectTransient;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.processors.cache.distributed.GridDistributedTxFinishResponse;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
+import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteUuid;
+import org.apache.ignite.marshaller.MarshallerUtils;
import org.apache.ignite.plugin.extensions.communication.MessageReader;
import org.apache.ignite.plugin.extensions.communication.MessageWriter;
import org.jetbrains.annotations.Nullable;
@@ -101,15 +103,17 @@ public class GridNearTxFinishResponse extends GridDistributedTxFinishResponse {
super.prepareMarshal(ctx);
if (err != null)
- errBytes = ctx.marshaller().marshal(err);
+ errBytes = CU.marshal(ctx, err);
}
/** {@inheritDoc} */
@Override public void finishUnmarshal(GridCacheSharedContext ctx, ClassLoader ldr) throws IgniteCheckedException {
super.finishUnmarshal(ctx, ldr);
- if (errBytes != null)
- err = ctx.marshaller().unmarshal(errBytes, U.resolveClassLoader(ldr, ctx.gridConfig()));
+ if (errBytes != null) {
+ err = MarshallerUtils.unmarshal(ctx.gridName(), ctx.marshaller(), errBytes,
+ U.resolveClassLoader(ldr, ctx.gridConfig()));
+ }
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/e5f3abd2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheLocalQueryFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheLocalQueryFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheLocalQueryFuture.java
index 91ae12c..7f33220 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheLocalQueryFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheLocalQueryFuture.java
@@ -26,6 +26,7 @@ import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteClosure;
import org.apache.ignite.lang.IgniteReducer;
import org.apache.ignite.marshaller.Marshaller;
+import org.apache.ignite.marshaller.MarshallerUtils;
/**
* Local query future.
@@ -113,11 +114,11 @@ public class GridCacheLocalQueryFuture<K, V, R> extends GridCacheQueryFutureAdap
Marshaller marsh = cctx.marshaller();
IgniteReducer<Object, Object> rdc = qry.reducer() != null ?
- marsh.<IgniteReducer<Object, Object>>unmarshal(marsh.marshal(qry.reducer()),
+ MarshallerUtils.marshalUnmarshal(cctx.gridName(), marsh, qry.reducer(),
U.resolveClassLoader(cctx.gridConfig())) : null;
IgniteClosure<Object, Object> trans = qry.transform() != null ?
- marsh.<IgniteClosure<Object, Object>>unmarshal(marsh.marshal(qry.transform()),
+ MarshallerUtils.marshalUnmarshal(cctx.gridName(), marsh, qry.transform(),
U.resolveClassLoader(cctx.gridConfig())) : null;
return new GridCacheQueryInfo(
http://git-wip-us.apache.org/repos/asf/ignite/blob/e5f3abd2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryRequest.java
index 88ebe99..08cbb29 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryRequest.java
@@ -35,6 +35,7 @@ import org.apache.ignite.lang.IgniteBiPredicate;
import org.apache.ignite.lang.IgniteClosure;
import org.apache.ignite.lang.IgniteReducer;
import org.apache.ignite.marshaller.Marshaller;
+import org.apache.ignite.marshaller.MarshallerUtils;
import org.apache.ignite.plugin.extensions.communication.MessageReader;
import org.apache.ignite.plugin.extensions.communication.MessageWriter;
import org.jetbrains.annotations.Nullable;
@@ -316,19 +317,25 @@ public class GridCacheQueryRequest extends GridCacheMessage implements GridCache
@Override public void finishUnmarshal(GridCacheSharedContext ctx, ClassLoader ldr) throws IgniteCheckedException {
super.finishUnmarshal(ctx, ldr);
- Marshaller mrsh = ctx.marshaller();
+ Marshaller marsh = ctx.marshaller();
- if (keyValFilterBytes != null)
- keyValFilter = mrsh.unmarshal(keyValFilterBytes, U.resolveClassLoader(ldr, ctx.gridConfig()));
+ if (keyValFilterBytes != null) {
+ keyValFilter = MarshallerUtils.unmarshal(ctx.gridName(), marsh, keyValFilterBytes,
+ U.resolveClassLoader(ldr, ctx.gridConfig()));
+ }
if (rdcBytes != null)
- rdc = mrsh.unmarshal(rdcBytes, ldr);
+ rdc = MarshallerUtils.unmarshal(ctx.gridName(), marsh, rdcBytes, ldr);
- if (transBytes != null)
- trans = mrsh.unmarshal(transBytes, U.resolveClassLoader(ldr, ctx.gridConfig()));
+ if (transBytes != null) {
+ trans = MarshallerUtils.unmarshal(ctx.gridName(), marsh, transBytes,
+ U.resolveClassLoader(ldr, ctx.gridConfig()));
+ }
- if (argsBytes != null)
- args = mrsh.unmarshal(argsBytes, U.resolveClassLoader(ldr, ctx.gridConfig()));
+ if (argsBytes != null) {
+ args = MarshallerUtils.unmarshal(ctx.gridName(), marsh, argsBytes,
+ U.resolveClassLoader(ldr, ctx.gridConfig()));
+ }
}
/** {@inheritDoc} */
@@ -343,9 +350,10 @@ public class GridCacheQueryRequest extends GridCacheMessage implements GridCache
void beforeLocalExecution(GridCacheContext ctx) throws IgniteCheckedException {
Marshaller marsh = ctx.marshaller();
- rdc = rdc != null ? marsh.<IgniteReducer<Object, Object>>unmarshal(marsh.marshal(rdc),
+ rdc = rdc != null ? MarshallerUtils.marshalUnmarshal(ctx.gridName(), marsh, rdc,
U.resolveClassLoader(ctx.gridConfig())) : null;
- trans = trans != null ? marsh.<IgniteClosure<Object, Object>>unmarshal(marsh.marshal(trans),
+
+ trans = trans != null ? MarshallerUtils.marshalUnmarshal(ctx.gridName(), marsh, trans,
U.resolveClassLoader(ctx.gridConfig())) : null;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/e5f3abd2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryResponse.java
index 2cde8f4..a015ac5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryResponse.java
@@ -32,7 +32,9 @@ import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.processors.query.GridQueryFieldMetadata;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.marshaller.MarshallerUtils;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType;
import org.apache.ignite.plugin.extensions.communication.MessageReader;
@@ -124,7 +126,7 @@ public class GridCacheQueryResponse extends GridCacheMessage implements GridCach
GridCacheContext cctx = ctx.cacheContext(cacheId);
if (err != null)
- errBytes = ctx.marshaller().marshal(err);
+ errBytes = CU.marshal(ctx, err);
metaDataBytes = marshalCollection(metadata, cctx);
dataBytes = marshalCollection(data, cctx);
@@ -145,8 +147,10 @@ public class GridCacheQueryResponse extends GridCacheMessage implements GridCach
@Override public void finishUnmarshal(GridCacheSharedContext ctx, ClassLoader ldr) throws IgniteCheckedException {
super.finishUnmarshal(ctx, ldr);
- if (errBytes != null)
- err = ctx.marshaller().unmarshal(errBytes, U.resolveClassLoader(ldr, ctx.gridConfig()));
+ if (errBytes != null) {
+ err = MarshallerUtils.unmarshal(ctx.gridName(), ctx.marshaller(), errBytes,
+ U.resolveClassLoader(ldr, ctx.gridConfig()));
+ }
metadata = unmarshalCollection(metaDataBytes, ctx, ldr);
data = unmarshalCollection(dataBytes, ctx, ldr);
http://git-wip-us.apache.org/repos/asf/ignite/blob/e5f3abd2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheSqlQuery.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheSqlQuery.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheSqlQuery.java
index 0733827..7a77ea5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheSqlQuery.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheSqlQuery.java
@@ -28,6 +28,7 @@ import org.apache.ignite.internal.util.typedef.internal.A;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.marshaller.Marshaller;
+import org.apache.ignite.marshaller.MarshallerUtils;
import org.apache.ignite.plugin.extensions.communication.Message;
import org.apache.ignite.plugin.extensions.communication.MessageReader;
import org.apache.ignite.plugin.extensions.communication.MessageWriter;
@@ -132,19 +133,21 @@ public class GridCacheSqlQuery implements Message {
/**
* @param m Marshaller.
+ * @param kernalCtx kernal context.
* @throws IgniteCheckedException If failed.
*/
- public void marshallParams(Marshaller m) throws IgniteCheckedException {
+ public void marshallParams(Marshaller m, final GridKernalContext kernalCtx) throws IgniteCheckedException {
if (paramsBytes != null)
return;
assert params != null;
- paramsBytes = m.marshal(params);
+ paramsBytes = MarshallerUtils.marshal(kernalCtx.gridName(), m, params);
}
/**
* @param m Marshaller.
+ * @param ctx kernal context.
* @throws IgniteCheckedException If failed.
*/
public void unmarshallParams(Marshaller m, GridKernalContext ctx) throws IgniteCheckedException {
@@ -153,7 +156,7 @@ public class GridCacheSqlQuery implements Message {
assert paramsBytes != null;
- params = m.unmarshal(paramsBytes, U.resolveClassLoader(ctx.config()));
+ params = MarshallerUtils.unmarshal(ctx.gridName(), m, paramsBytes, U.resolveClassLoader(ctx.config()));
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/e5f3abd2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
index fc38eba..627a04b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
@@ -77,6 +77,7 @@ import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteAsyncCallback;
import org.apache.ignite.lang.IgniteBiTuple;
+import org.apache.ignite.marshaller.MarshallerUtils;
import org.apache.ignite.thread.IgniteStripedThreadPoolExecutor;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
@@ -1541,7 +1542,7 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
depInfo = new GridDeploymentInfoBean(dep);
- bytes = ctx.config().getMarshaller().marshal(obj);
+ bytes = MarshallerUtils.marshal(ctx, obj);
}
/**
@@ -1559,7 +1560,8 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
if (dep == null)
throw new IgniteDeploymentCheckedException("Failed to obtain deployment for class: " + clsName);
- return ctx.config().getMarshaller().unmarshal(bytes, U.resolveClassLoader(dep.classLoader(), ctx.config()));
+ return MarshallerUtils.unmarshal(ctx.gridName(), ctx.config().getMarshaller(), bytes,
+ U.resolveClassLoader(dep.classLoader(), ctx.config()));
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/e5f3abd2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/jdbc/GridCacheQueryJdbcMetadataTask.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/jdbc/GridCacheQueryJdbcMetadataTask.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/jdbc/GridCacheQueryJdbcMetadataTask.java
index a8b2da4..d659f10 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/jdbc/GridCacheQueryJdbcMetadataTask.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/jdbc/GridCacheQueryJdbcMetadataTask.java
@@ -41,6 +41,7 @@ import org.apache.ignite.internal.processors.cache.query.GridCacheSqlMetadata;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.marshaller.Marshaller;
+import org.apache.ignite.marshaller.MarshallerUtils;
import org.apache.ignite.marshaller.jdk.JdkMarshaller;
import org.apache.ignite.resources.IgniteInstanceResource;
import org.apache.ignite.resources.LoggerResource;
@@ -153,7 +154,7 @@ public class GridCacheQueryJdbcMetadataTask extends ComputeTaskAdapter<String, b
status = 0;
- data = MARSHALLER.marshal(F.asList(schemasMap, indexesInfo));
+ data = MarshallerUtils.marshal(ignite.name(), MARSHALLER, F.asList(schemasMap, indexesInfo));
}
catch (Throwable t) {
U.error(log, "Failed to get metadata for JDBC.", t);
@@ -163,7 +164,7 @@ public class GridCacheQueryJdbcMetadataTask extends ComputeTaskAdapter<String, b
status = 1;
try {
- data = MARSHALLER.marshal(err);
+ data = MarshallerUtils.marshal(ignite.name(), MARSHALLER, err);
}
catch (IgniteCheckedException e) {
throw new IgniteException(e);
http://git-wip-us.apache.org/repos/asf/ignite/blob/e5f3abd2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/jdbc/GridCacheQueryJdbcTask.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/jdbc/GridCacheQueryJdbcTask.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/jdbc/GridCacheQueryJdbcTask.java
index 5c60762..a6c5f14 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/jdbc/GridCacheQueryJdbcTask.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/jdbc/GridCacheQueryJdbcTask.java
@@ -52,6 +52,7 @@ import org.apache.ignite.internal.util.typedef.CAX;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.marshaller.Marshaller;
+import org.apache.ignite.marshaller.MarshallerUtils;
import org.apache.ignite.marshaller.jdk.JdkMarshaller;
import org.apache.ignite.resources.IgniteInstanceResource;
import org.apache.ignite.resources.LoggerResource;
@@ -82,7 +83,7 @@ public class GridCacheQueryJdbcTask extends ComputeTaskAdapter<byte[], byte[]> {
try {
assert arg != null;
- Map<String, Object> args = MARSHALLER.unmarshal(arg, null);
+ Map<String, Object> args = MarshallerUtils.unmarshal(ignite.name(), MARSHALLER, arg, null);
boolean first = true;
@@ -130,12 +131,13 @@ public class GridCacheQueryJdbcTask extends ComputeTaskAdapter<byte[], byte[]> {
if (res.getException() == null) {
status = 0;
- bytes = MARSHALLER.marshal(res.getData());
+ bytes = MarshallerUtils.marshal(ignite.name(), MARSHALLER, res.getData());
}
else {
status = 1;
- bytes = MARSHALLER.marshal(new SQLException(res.getException().getMessage()));
+ bytes = MarshallerUtils.marshal(ignite.name(), MARSHALLER, new SQLException(res.getException().getMessage())
+ );
}
byte[] packet = new byte[bytes.length + 1];
http://git-wip-us.apache.org/repos/asf/ignite/blob/e5f3abd2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
index 3258ce9..e7fc5de 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
@@ -48,6 +48,7 @@ import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.T2;
import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.marshaller.MarshallerUtils;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.plugin.extensions.communication.Message;
import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType;
@@ -878,8 +879,8 @@ public class IgniteTxEntry implements GridPeerDeployAware, Message {
// Unmarshal transform closure anyway if it exists.
if (transformClosBytes != null && entryProcessorsCol == null)
- entryProcessorsCol = ctx.marshaller().unmarshal(transformClosBytes,
- U.resolveClassLoader(clsLdr, ctx.gridConfig()));
+ entryProcessorsCol = MarshallerUtils.unmarshal(ctx.gridName(), ctx.marshaller(),
+ transformClosBytes, U.resolveClassLoader(clsLdr, ctx.gridConfig()));
if (filters == null)
filters = CU.empty0();
@@ -894,8 +895,10 @@ public class IgniteTxEntry implements GridPeerDeployAware, Message {
val.unmarshal(this.ctx, clsLdr);
- if (expiryPlcBytes != null)
- expiryPlc = ctx.marshaller().unmarshal(expiryPlcBytes, U.resolveClassLoader(clsLdr, ctx.gridConfig()));
+ if (expiryPlcBytes != null) {
+ expiryPlc = MarshallerUtils.unmarshal(ctx.gridName(), ctx.marshaller(), expiryPlcBytes,
+ U.resolveClassLoader(clsLdr, ctx.gridConfig()));
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/e5f3abd2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheRawVersionedEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheRawVersionedEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheRawVersionedEntry.java
index 4c5a704..98c651b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheRawVersionedEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheRawVersionedEntry.java
@@ -31,6 +31,7 @@ import org.apache.ignite.internal.processors.datastreamer.DataStreamerEntry;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.marshaller.Marshaller;
+import org.apache.ignite.marshaller.MarshallerUtils;
import org.apache.ignite.plugin.extensions.communication.MessageReader;
import org.apache.ignite.plugin.extensions.communication.MessageWriter;
import org.jetbrains.annotations.Nullable;
@@ -191,7 +192,8 @@ public class GridCacheRawVersionedEntry<K, V> extends DataStreamerEntry implemen
unmarshalKey(ctx, marsh);
if (val == null && valBytes != null) {
- val = marsh.unmarshal(valBytes, U.resolveClassLoader(ctx.kernalContext().config()));
+ val = MarshallerUtils.unmarshal(ctx.kernalContext().gridName(), marsh, valBytes,
+ U.resolveClassLoader(ctx.kernalContext().config()));
val.finishUnmarshal(ctx, null);
}
@@ -222,7 +224,8 @@ public class GridCacheRawVersionedEntry<K, V> extends DataStreamerEntry implemen
if (key == null) {
assert keyBytes != null;
- key = marsh.unmarshal(keyBytes, U.resolveClassLoader(ctx.kernalContext().config()));
+ key = MarshallerUtils.unmarshal(ctx.kernalContext().gridName(), marsh, keyBytes,
+ U.resolveClassLoader(ctx.kernalContext().config()));
key.finishUnmarshal(ctx, null);
}
@@ -239,13 +242,13 @@ public class GridCacheRawVersionedEntry<K, V> extends DataStreamerEntry implemen
if (keyBytes == null) {
key.prepareMarshal(ctx);
- keyBytes = marsh.marshal(key);
+ keyBytes = MarshallerUtils.marshal(ctx.kernalContext().gridName(), marsh, key);
}
if (valBytes == null && val != null) {
val.prepareMarshal(ctx);
- valBytes = marsh.marshal(val);
+ valBytes = MarshallerUtils.marshal(ctx.kernalContext().gridName(), marsh, val);
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/e5f3abd2/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessorImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessorImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessorImpl.java
index 6630c7a..a3070e8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessorImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessorImpl.java
@@ -43,6 +43,7 @@ import org.apache.ignite.internal.util.IgniteUtils;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteUuid;
+import org.apache.ignite.marshaller.MarshallerUtils;
import org.jetbrains.annotations.Nullable;
import static org.apache.ignite.cache.CacheMemoryMode.OFFHEAP_VALUES;
@@ -107,8 +108,8 @@ public class IgniteCacheObjectProcessorImpl extends GridProcessorAdapter impleme
@Override public Object unmarshal(CacheObjectContext ctx, byte[] bytes, ClassLoader clsLdr)
throws IgniteCheckedException
{
- return ctx.kernalContext().cache().context().marshaller().unmarshal(bytes, U.resolveClassLoader(clsLdr,
- ctx.kernalContext().config()));
+ return MarshallerUtils.unmarshal(ctx.kernalContext().gridName(), ctx.kernalContext().cache().context().marshaller(),
+ bytes, U.resolveClassLoader(clsLdr, ctx.kernalContext().config()));
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/e5f3abd2/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java
index 1a96954..fb5c2c8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java
@@ -70,6 +70,7 @@ import org.apache.ignite.lang.IgniteClosure;
import org.apache.ignite.lang.IgniteProductVersion;
import org.apache.ignite.lang.IgniteReducer;
import org.apache.ignite.marshaller.Marshaller;
+import org.apache.ignite.marshaller.MarshallerUtils;
import org.apache.ignite.resources.LoadBalancerResource;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
@@ -1144,16 +1145,18 @@ public class GridClosureProcessor extends GridProcessorAdapter {
if (closureBytes == null) {
closure = c.job;
- closureBytes = marsh.marshal(c.job);
+ closureBytes = MarshallerUtils.marshal(ctx, c.job);
}
if (c.job == closure)
- c.job = marsh.unmarshal(closureBytes, U.resolveClassLoader(ctx.config()));
+ c.job = MarshallerUtils.unmarshal(ctx.gridName(), marsh, closureBytes, U.resolveClassLoader(ctx.config()));
else
- c.job = marsh.unmarshal(marsh.marshal(c.job), U.resolveClassLoader(ctx.config()));
+ c.job = MarshallerUtils.marshalUnmarshal(ctx.gridName(), marsh, c.job,
+ U.resolveClassLoader(ctx.config()));
}
else
- job = marsh.unmarshal(marsh.marshal(job), U.resolveClassLoader(ctx.config()));
+ job = MarshallerUtils.marshalUnmarshal(ctx.gridName(), marsh, job,
+ U.resolveClassLoader(ctx.config()));
}
else
hadLocNode = true;
http://git-wip-us.apache.org/repos/asf/ignite/blob/e5f3abd2/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
index e96e646..28df19f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
@@ -77,6 +77,7 @@ import org.apache.ignite.lang.IgnitePredicate;
import org.apache.ignite.lang.IgniteProductVersion;
import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.marshaller.Marshaller;
+import org.apache.ignite.marshaller.MarshallerUtils;
import org.apache.ignite.plugin.extensions.communication.Message;
import org.apache.ignite.thread.IgniteThread;
import org.jetbrains.annotations.Nullable;
@@ -288,7 +289,8 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
if (msg.data() == null && msg.dataBytes() != null) {
try {
- msg.data(marsh.unmarshal(msg.dataBytes(), U.resolveClassLoader(ctx.config())));
+ msg.data(MarshallerUtils.unmarshal(ctx.gridName(), marsh, msg.dataBytes(),
+ U.resolveClassLoader(ctx.config())));
}
catch (IgniteCheckedException e) {
U.error(log, "Failed to process message (ignoring): " + msg, e);
@@ -676,7 +678,7 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
reqData.className(clsName);
reqData.deploymentInfo(new GridDeploymentInfoBean(dep));
- reqData.p2pMarshal(marsh);
+ reqData.p2pMarshal(marsh, ctx);
}
// Handle peer deployment for other handler-specific objects.
@@ -733,7 +735,8 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
if (msg.data() == null && msg.dataBytes() != null) {
try {
- msg.data(marsh.unmarshal(msg.dataBytes(), U.resolveClassLoader(ctx.config())));
+ msg.data(MarshallerUtils.unmarshal(ctx.gridName(), marsh, msg.dataBytes(),
+ U.resolveClassLoader(ctx.config())));
}
catch (IgniteCheckedException e) {
U.error(log, "Failed to process message (ignoring): " + msg, e);
@@ -976,7 +979,7 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
if (dep == null)
throw new IgniteDeploymentCheckedException("Failed to obtain deployment for class: " + clsName);
- data.p2pUnmarshal(marsh, U.resolveClassLoader(dep.classLoader(), ctx.config()));
+ data.p2pUnmarshal(marsh, U.resolveClassLoader(dep.classLoader(), ctx.config()), ctx);
}
hnd.p2pUnmarshal(node.id(), ctx);
@@ -1321,7 +1324,7 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
if (!msg.messages() &&
msg.data() != null &&
(nodes.size() > 1 || !ctx.localNodeId().equals(F.first(nodes).id())))
- msg.dataBytes(marsh.marshal(msg.data()));
+ msg.dataBytes(MarshallerUtils.marshal(ctx, msg.data()));
for (ClusterNode node : nodes) {
int cnt = 0;
http://git-wip-us.apache.org/repos/asf/ignite/blob/e5f3abd2/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRequestData.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRequestData.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRequestData.java
index cdfe0e1..619bd86 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRequestData.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRequestData.java
@@ -23,11 +23,13 @@ import java.io.ObjectInput;
import java.io.ObjectOutput;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.managers.deployment.GridDeploymentInfo;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgnitePredicate;
import org.apache.ignite.marshaller.Marshaller;
+import org.apache.ignite.marshaller.MarshallerUtils;
import org.jetbrains.annotations.Nullable;
/**
@@ -90,26 +92,29 @@ class StartRequestData implements Externalizable {
/**
* @param marsh Marshaller.
+ * @param kernalCtx Kernal context.
* @throws org.apache.ignite.IgniteCheckedException In case of error.
*/
- void p2pMarshal(Marshaller marsh) throws IgniteCheckedException {
+ void p2pMarshal(Marshaller marsh, final GridKernalContext kernalCtx) throws IgniteCheckedException {
assert marsh != null;
- prjPredBytes = marsh.marshal(prjPred);
+ prjPredBytes = MarshallerUtils.marshal(kernalCtx.gridName(), marsh, prjPred);
}
/**
* @param marsh Marshaller.
* @param ldr Class loader.
+ * @param kernalCtx Kernal context.
* @throws org.apache.ignite.IgniteCheckedException In case of error.
*/
- void p2pUnmarshal(Marshaller marsh, @Nullable ClassLoader ldr) throws IgniteCheckedException {
+ void p2pUnmarshal(Marshaller marsh, @Nullable ClassLoader ldr,
+ final GridKernalContext kernalCtx) throws IgniteCheckedException {
assert marsh != null;
assert prjPred == null;
assert prjPredBytes != null;
- prjPred = marsh.unmarshal(prjPredBytes, ldr);
+ prjPred = MarshallerUtils.unmarshal(kernalCtx.gridName(), marsh, prjPredBytes, ldr);
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/e5f3abd2/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java
index c7c1f5e..853d1a0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java
@@ -37,6 +37,7 @@ import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.internal.util.worker.GridWorker;
import org.apache.ignite.lang.IgniteFuture;
import org.apache.ignite.marshaller.Marshaller;
+import org.apache.ignite.marshaller.MarshallerUtils;
import org.apache.ignite.stream.StreamReceiver;
import org.apache.ignite.thread.IgniteThread;
import org.jetbrains.annotations.Nullable;
@@ -90,7 +91,7 @@ public class DataStreamProcessor<K, V> extends GridProcessorAdapter {
if (ctx.config().isDaemon())
return;
- marshErrBytes = marsh.marshal(new IgniteCheckedException("Failed to marshal response error, " +
+ marshErrBytes = MarshallerUtils.marshal(ctx, new IgniteCheckedException("Failed to marshal response error, " +
"see node log for details."));
flusher = new IgniteThread(new GridWorker(ctx.gridName(), "grid-data-loader-flusher", log) {
@@ -235,7 +236,8 @@ public class DataStreamProcessor<K, V> extends GridProcessorAdapter {
Object topic;
try {
- topic = marsh.unmarshal(req.responseTopicBytes(), U.resolveClassLoader(null, ctx.config()));
+ topic = MarshallerUtils.unmarshal(ctx.gridName(), marsh, req.responseTopicBytes(),
+ U.resolveClassLoader(null, ctx.config()));
}
catch (IgniteCheckedException e) {
U.error(log, "Failed to unmarshal topic from request: " + req, e);
@@ -275,7 +277,8 @@ public class DataStreamProcessor<K, V> extends GridProcessorAdapter {
StreamReceiver<K, V> updater;
try {
- updater = marsh.unmarshal(req.updaterBytes(), U.resolveClassLoader(clsLdr, ctx.config()));
+ updater = MarshallerUtils.unmarshal(ctx.gridName(), marsh, req.updaterBytes(),
+ U.resolveClassLoader(clsLdr, ctx.config()));
if (updater != null)
ctx.resource().injectGeneric(updater);
@@ -329,7 +332,7 @@ public class DataStreamProcessor<K, V> extends GridProcessorAdapter {
byte[] errBytes;
try {
- errBytes = err != null ? marsh.marshal(err) : null;
+ errBytes = err != null ? MarshallerUtils.marshal(ctx, err) : null;
}
catch (Exception e) {
U.error(log, "Failed to marshal error [err=" + err + ", marshErr=" + e + ']', e);
http://git-wip-us.apache.org/repos/asf/ignite/blob/e5f3abd2/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
index 21df559..9d04a5f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
@@ -98,6 +98,7 @@ import org.apache.ignite.lang.IgniteClosure;
import org.apache.ignite.lang.IgniteFuture;
import org.apache.ignite.lang.IgniteInClosure;
import org.apache.ignite.lang.IgniteUuid;
+import org.apache.ignite.marshaller.MarshallerUtils;
import org.apache.ignite.plugin.security.SecurityPermission;
import org.apache.ignite.stream.StreamReceiver;
import org.jetbrains.annotations.Nullable;
@@ -1359,11 +1360,11 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
if (updaterBytes == null) {
assert rcvr != null;
- updaterBytes = ctx.config().getMarshaller().marshal(rcvr);
+ updaterBytes = MarshallerUtils.marshal(ctx, rcvr);
}
if (topicBytes == null)
- topicBytes = ctx.config().getMarshaller().marshal(topic);
+ topicBytes = MarshallerUtils.marshal(ctx, topic);
}
catch (IgniteCheckedException e) {
U.error(log, "Failed to marshal (request will not be sent).", e);
@@ -1494,8 +1495,7 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
try {
GridPeerDeployAware jobPda0 = jobPda;
- err = ctx.config().getMarshaller().unmarshal(
- errBytes,
+ err = MarshallerUtils.unmarshal(ctx.gridName(), ctx.config().getMarshaller(), errBytes,
U.resolveClassLoader(jobPda0 != null ? jobPda0.classLoader() : null, ctx.config()));
}
catch (IgniteCheckedException e) {
http://git-wip-us.apache.org/repos/asf/ignite/blob/e5f3abd2/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsAckMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsAckMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsAckMessage.java
index 0bf3dde..20c80bb 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsAckMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsAckMessage.java
@@ -21,8 +21,10 @@ import java.io.Externalizable;
import java.nio.ByteBuffer;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.internal.GridDirectTransient;
+import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.marshaller.Marshaller;
+import org.apache.ignite.marshaller.MarshallerUtils;
import org.apache.ignite.plugin.extensions.communication.MessageReader;
import org.apache.ignite.plugin.extensions.communication.MessageWriter;
import org.jetbrains.annotations.Nullable;
@@ -92,19 +94,20 @@ public class IgfsAckMessage extends IgfsCommunicationMessage {
}
/** {@inheritDoc} */
- @Override public void prepareMarshal(Marshaller marsh) throws IgniteCheckedException {
- super.prepareMarshal(marsh);
+ @Override public void prepareMarshal(Marshaller marsh, GridKernalContext kernalCtx) throws IgniteCheckedException {
+ super.prepareMarshal(marsh, kernalCtx);
if (err != null)
- errBytes = marsh.marshal(err);
+ errBytes = MarshallerUtils.marshal(kernalCtx.gridName(), marsh, err);
}
/** {@inheritDoc} */
- @Override public void finishUnmarshal(Marshaller marsh, @Nullable ClassLoader ldr) throws IgniteCheckedException {
- super.finishUnmarshal(marsh, ldr);
+ @Override public void finishUnmarshal(Marshaller marsh, @Nullable ClassLoader ldr,
+ final GridKernalContext kernalCtx) throws IgniteCheckedException {
+ super.finishUnmarshal(marsh, ldr, kernalCtx);
if (errBytes != null)
- err = marsh.unmarshal(errBytes, ldr);
+ err = MarshallerUtils.unmarshal(kernalCtx.gridName(), marsh, errBytes, ldr);
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/e5f3abd2/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsCommunicationMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsCommunicationMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsCommunicationMessage.java
index 412c45b..d1d40e7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsCommunicationMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsCommunicationMessage.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.igfs;
import java.nio.ByteBuffer;
import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.marshaller.Marshaller;
import org.apache.ignite.plugin.extensions.communication.Message;
import org.apache.ignite.plugin.extensions.communication.MessageReader;
@@ -34,18 +35,21 @@ public abstract class IgfsCommunicationMessage implements Message {
/**
* @param marsh Marshaller.
+ * @param kernalCtx Kernal ctx.
* @throws IgniteCheckedException In case of error.
*/
- public void prepareMarshal(Marshaller marsh) throws IgniteCheckedException {
+ public void prepareMarshal(Marshaller marsh, final GridKernalContext kernalCtx) throws IgniteCheckedException {
// No-op.
}
/**
* @param marsh Marshaller.
* @param ldr Class loader.
+ * @param kernalCtx Kernal ctx.
* @throws IgniteCheckedException In case of error.
*/
- public void finishUnmarshal(Marshaller marsh, @Nullable ClassLoader ldr) throws IgniteCheckedException {
+ public void finishUnmarshal(Marshaller marsh, @Nullable ClassLoader ldr,
+ final GridKernalContext kernalCtx) throws IgniteCheckedException {
// No-op.
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/e5f3abd2/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsContext.java
index a638bf3..76290e3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsContext.java
@@ -148,7 +148,7 @@ public class IgfsContext {
public void send(UUID nodeId, Object topic, IgfsCommunicationMessage msg, byte plc)
throws IgniteCheckedException {
if (!kernalContext().localNodeId().equals(nodeId))
- msg.prepareMarshal(kernalContext().config().getMarshaller());
+ msg.prepareMarshal(kernalContext().config().getMarshaller(), kernalContext());
kernalContext().io().send(nodeId, topic, msg, plc);
}
@@ -163,7 +163,7 @@ public class IgfsContext {
public void send(ClusterNode node, Object topic, IgfsCommunicationMessage msg, byte plc)
throws IgniteCheckedException {
if (!kernalContext().localNodeId().equals(node.id()))
- msg.prepareMarshal(kernalContext().config().getMarshaller());
+ msg.prepareMarshal(kernalContext().config().getMarshaller(), kernalContext());
kernalContext().io().send(node, topic, msg, plc);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/e5f3abd2/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java
index bdf3b08..ed1f525 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java
@@ -1149,7 +1149,7 @@ public class IgfsDataManager extends IgfsManager {
@SuppressWarnings("ThrowableResultOfMethodCallIgnored")
private void processAckMessage(UUID nodeId, IgfsAckMessage ackMsg) {
try {
- ackMsg.finishUnmarshal(igfsCtx.kernalContext().config().getMarshaller(), null);
+ ackMsg.finishUnmarshal(igfsCtx.kernalContext().config().getMarshaller(), null, igfsCtx.kernalContext());
}
catch (IgniteCheckedException e) {
U.error(log, "Failed to unmarshal message (will ignore): " + ackMsg, e);
http://git-wip-us.apache.org/repos/asf/ignite/blob/e5f3abd2/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDeleteMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDeleteMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDeleteMessage.java
index e59b257..d0776bd 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDeleteMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDeleteMessage.java
@@ -21,9 +21,11 @@ import java.io.Externalizable;
import java.nio.ByteBuffer;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.internal.GridDirectTransient;
+import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.marshaller.Marshaller;
+import org.apache.ignite.marshaller.MarshallerUtils;
import org.apache.ignite.plugin.extensions.communication.MessageReader;
import org.apache.ignite.plugin.extensions.communication.MessageWriter;
import org.jetbrains.annotations.Nullable;
@@ -91,19 +93,20 @@ public class IgfsDeleteMessage extends IgfsCommunicationMessage {
}
/** {@inheritDoc} */
- @Override public void prepareMarshal(Marshaller marsh) throws IgniteCheckedException {
- super.prepareMarshal(marsh);
+ @Override public void prepareMarshal(Marshaller marsh, GridKernalContext kernalCtx) throws IgniteCheckedException {
+ super.prepareMarshal(marsh, kernalCtx);
if (err != null)
- errBytes = marsh.marshal(err);
+ errBytes = MarshallerUtils.marshal(kernalCtx.gridName(), marsh, err);
}
/** {@inheritDoc} */
- @Override public void finishUnmarshal(Marshaller marsh, @Nullable ClassLoader ldr) throws IgniteCheckedException {
- super.finishUnmarshal(marsh, ldr);
+ @Override public void finishUnmarshal(Marshaller marsh, @Nullable ClassLoader ldr,
+ final GridKernalContext kernalCtx) throws IgniteCheckedException {
+ super.finishUnmarshal(marsh, ldr, kernalCtx);
if (errBytes != null)
- err = marsh.unmarshal(errBytes, ldr);
+ err = MarshallerUtils.unmarshal(kernalCtx.gridName(), marsh, errBytes, ldr);
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/e5f3abd2/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsFragmentizerManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsFragmentizerManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsFragmentizerManager.java
index d64c64a..e04445f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsFragmentizerManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsFragmentizerManager.java
@@ -263,7 +263,7 @@ public class IgfsFragmentizerManager extends IgfsManager {
*/
@SuppressWarnings("fallthrough")
private void processFragmentizerRequest(IgfsFragmentizerRequest req) throws IgniteCheckedException {
- req.finishUnmarshal(igfsCtx.kernalContext().config().getMarshaller(), null);
+ req.finishUnmarshal(igfsCtx.kernalContext().config().getMarshaller(), null, igfsCtx.kernalContext());
Collection<IgfsFileAffinityRange> ranges = req.fragmentRanges();
IgniteUuid fileId = req.fileId();
http://git-wip-us.apache.org/repos/asf/ignite/blob/e5f3abd2/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java
index 7234e52..a25d082 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java
@@ -256,7 +256,7 @@ public final class IgfsImpl implements IgfsEx {
if (secondaryFs instanceof HadoopPayloadAware)
secondaryFsPayload = ((HadoopPayloadAware) secondaryFs).getPayload();
- secondaryPaths = new IgfsPaths(secondaryFsPayload, dfltMode, modeRslvr.modesOrdered());
+ secondaryPaths = new IgfsPaths(secondaryFsPayload, dfltMode, modeRslvr.modesOrdered(), igfsCtx);
// Check whether IGFS LRU eviction policy is set on data cache.
String dataCacheName = igfsCtx.configuration().getDataCacheName();
http://git-wip-us.apache.org/repos/asf/ignite/blob/e5f3abd2/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsPaths.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsPaths.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsPaths.java
index 4a79259..9892bc8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsPaths.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsPaths.java
@@ -31,6 +31,7 @@ import org.apache.ignite.igfs.IgfsMode;
import org.apache.ignite.igfs.IgfsPath;
import org.apache.ignite.internal.util.typedef.T2;
import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.marshaller.MarshallerUtils;
import org.apache.ignite.marshaller.jdk.JdkMarshaller;
import org.jetbrains.annotations.Nullable;
@@ -63,9 +64,11 @@ public class IgfsPaths implements Externalizable {
* @param payload Payload.
* @param dfltMode Default IGFS mode.
* @param pathModes Path modes.
+ * @param igfsCtx Ignite FS context.
* @throws IgniteCheckedException If failed.
*/
- public IgfsPaths(Object payload, IgfsMode dfltMode, @Nullable List<T2<IgfsPath, IgfsMode>> pathModes)
+ public IgfsPaths(Object payload, IgfsMode dfltMode, @Nullable List<T2<IgfsPath, IgfsMode>> pathModes,
+ final IgfsContext igfsCtx)
throws IgniteCheckedException {
this.dfltMode = dfltMode;
this.pathModes = pathModes;
@@ -75,7 +78,7 @@ public class IgfsPaths implements Externalizable {
else {
ByteArrayOutputStream out = new ByteArrayOutputStream();
- new JdkMarshaller().marshal(payload, out);
+ MarshallerUtils.marshal(igfsCtx.kernalContext().gridName(), new JdkMarshaller(), payload, out);
payloadBytes = out.toByteArray();
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/e5f3abd2/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java
index 906d298..5938eb0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java
@@ -76,6 +76,7 @@ import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.lang.IgnitePredicate;
import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.marshaller.Marshaller;
+import org.apache.ignite.marshaller.MarshallerUtils;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.jsr166.ConcurrentHashMap8;
@@ -403,7 +404,7 @@ public class GridJobProcessor extends GridProcessorAdapter {
boolean loc = ctx.localNodeId().equals(taskNode.id()) && !ctx.config().isMarshalLocalJobs();
GridTaskSessionRequest req = new GridTaskSessionRequest(ses.getId(), ses.getJobId(),
- loc ? null : marsh.marshal(attrs), attrs);
+ loc ? null : MarshallerUtils.marshal(ctx, attrs), attrs);
Object topic = TOPIC_TASK.topic(ses.getJobId(), ctx.discovery().localNode().id());
@@ -454,7 +455,7 @@ public class GridJobProcessor extends GridProcessorAdapter {
if (res.jobSiblings() == null) {
try {
- res.unmarshalSiblings(marsh);
+ res.unmarshalSiblings(marsh, ctx);
}
catch (IgniteCheckedException e) {
U.error(log, "Failed to unmarshal job siblings.", e);
@@ -516,7 +517,7 @@ public class GridJobProcessor extends GridProcessorAdapter {
ctx.io().send(taskNode, TOPIC_JOB_SIBLINGS,
new GridJobSiblingsRequest(ses.getId(),
loc ? topic : null,
- loc ? null : marsh.marshal(topic)),
+ loc ? null : MarshallerUtils.marshal(ctx, topic)),
SYSTEM_POOL);
// 4. Listen to discovery events.
@@ -1018,7 +1019,8 @@ public class GridJobProcessor extends GridProcessorAdapter {
if (siblings0 == null) {
assert req.getSiblingsBytes() != null;
- siblings0 = marsh.unmarshal(req.getSiblingsBytes(), U.resolveClassLoader(ctx.config()));
+ siblings0 = MarshallerUtils.unmarshal(ctx.gridName(), marsh, req.getSiblingsBytes(),
+ U.resolveClassLoader(ctx.config()));
}
siblings = new ArrayList<>(siblings0);
@@ -1029,9 +1031,10 @@ public class GridJobProcessor extends GridProcessorAdapter {
if (req.isSessionFullSupport()) {
sesAttrs = req.getSessionAttributes();
- if (sesAttrs == null)
- sesAttrs = marsh.unmarshal(req.getSessionAttributesBytes(),
+ if (sesAttrs == null) {
+ sesAttrs = MarshallerUtils.unmarshal(ctx.gridName(), marsh, req.getSessionAttributesBytes(),
U.resolveClassLoader(dep.classLoader(), ctx.config()));
+ }
}
// Note that we unmarshal session/job attributes here with proper class loader.
@@ -1056,9 +1059,10 @@ public class GridJobProcessor extends GridProcessorAdapter {
Map<? extends Serializable, ? extends Serializable> jobAttrs = req.getJobAttributes();
- if (jobAttrs == null)
- jobAttrs = marsh.unmarshal(req.getJobAttributesBytes(),
+ if (jobAttrs == null) {
+ jobAttrs = MarshallerUtils.unmarshal(ctx.gridName(), marsh, req.getJobAttributesBytes(),
U.resolveClassLoader(dep.classLoader(), ctx.config()));
+ }
jobCtx = new GridJobContextImpl(ctx, req.getJobId(), jobAttrs);
}
@@ -1330,11 +1334,11 @@ public class GridJobProcessor extends GridProcessorAdapter {
locNodeId,
req.getSessionId(),
req.getJobId(),
- loc ? null : marsh.marshal(ex),
+ loc ? null : MarshallerUtils.marshal(ctx, ex),
ex,
- loc ? null : marsh.marshal(null),
+ loc ? null : MarshallerUtils.marshal(ctx, null),
null,
- loc ? null : marsh.marshal(null),
+ loc ? null : MarshallerUtils.marshal(ctx, null),
null,
false);
@@ -1425,7 +1429,7 @@ public class GridJobProcessor extends GridProcessorAdapter {
boolean loc = ctx.localNodeId().equals(nodeId) && !ctx.config().isMarshalLocalJobs();
Map<?, ?> attrs = loc ? req.getAttributes() :
- (Map<?, ?>)marsh.unmarshal(req.getAttributesBytes(),
+ (Map<?, ?>)MarshallerUtils.unmarshal(ctx.gridName(), marsh, req.getAttributesBytes(),
U.resolveClassLoader(ses.getClassLoader(), ctx.config()));
if (ctx.event().isRecordable(EVT_TASK_SESSION_ATTR_SET)) {
http://git-wip-us.apache.org/repos/asf/ignite/blob/e5f3abd2/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java
index 164c9e7..39457e7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java
@@ -54,6 +54,7 @@ import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.lang.IgniteRunnable;
import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.marshaller.Marshaller;
+import org.apache.ignite.marshaller.MarshallerUtils;
import org.jetbrains.annotations.Nullable;
import static org.apache.ignite.events.EventType.EVT_JOB_CANCELLED;
@@ -406,7 +407,8 @@ public class GridJobWorker extends GridWorker implements GridTimeoutObject {
try {
if (job == null) {
- job = marsh.unmarshal(jobBytes, U.resolveClassLoader(dep.classLoader(), ctx.config()));
+ job = MarshallerUtils.unmarshal(ctx.gridName(), marsh, jobBytes,
+ U.resolveClassLoader(dep.classLoader(), ctx.config()));
// No need to hold reference any more.
jobBytes = null;
@@ -744,11 +746,11 @@ public class GridJobWorker extends GridWorker implements GridTimeoutObject {
ctx.localNodeId(),
ses.getId(),
ses.getJobId(),
- loc ? null : marsh.marshal(ex),
+ loc ? null : MarshallerUtils.marshal(ctx, ex),
loc ? ex : null,
- loc ? null: marsh.marshal(res),
+ loc ? null: MarshallerUtils.marshal(ctx, res),
loc ? res : null,
- loc ? null : marsh.marshal(attrs),
+ loc ? null : MarshallerUtils.marshal(ctx, attrs),
loc ? attrs : null,
isCancelled());
http://git-wip-us.apache.org/repos/asf/ignite/blob/e5f3abd2/modules/core/src/main/java/org/apache/ignite/internal/processors/offheap/GridOffHeapProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/offheap/GridOffHeapProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/offheap/GridOffHeapProcessor.java
index d9d4421..1de7049 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/offheap/GridOffHeapProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/offheap/GridOffHeapProcessor.java
@@ -33,6 +33,7 @@ import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteBiPredicate;
import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.marshaller.Marshaller;
+import org.apache.ignite.marshaller.MarshallerUtils;
import org.jetbrains.annotations.Nullable;
import org.jsr166.ConcurrentHashMap8;
@@ -108,7 +109,7 @@ public class GridOffHeapProcessor extends GridProcessorAdapter {
private byte[] keyBytes(KeyCacheObject key, @Nullable byte[] keyBytes) throws IgniteCheckedException {
assert key != null;
- return keyBytes != null ? keyBytes : marsh.marshal(key);
+ return keyBytes != null ? keyBytes : MarshallerUtils.marshal(ctx, key);
}
/**
@@ -212,7 +213,7 @@ public class GridOffHeapProcessor extends GridProcessorAdapter {
if (valBytes == null)
return null;
- return marsh.unmarshal(valBytes, U.resolveClassLoader(ldr, ctx.config()));
+ return MarshallerUtils.unmarshal(ctx.gridName(), marsh, valBytes, U.resolveClassLoader(ldr, ctx.config()));
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/e5f3abd2/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/task/GridTaskCommandHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/task/GridTaskCommandHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/task/GridTaskCommandHandler.java
index 6937196..c052570 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/task/GridTaskCommandHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/task/GridTaskCommandHandler.java
@@ -63,6 +63,7 @@ import org.apache.ignite.internal.visor.util.VisorClusterGroupEmptyException;
import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.lang.IgniteInClosure;
import org.apache.ignite.lang.IgniteUuid;
+import org.apache.ignite.marshaller.MarshallerUtils;
import org.apache.ignite.resources.IgniteInstanceResource;
import org.jetbrains.annotations.Nullable;
@@ -135,13 +136,13 @@ public class GridTaskCommandHandler extends GridRestCommandHandlerAdapter {
res.error(err.getMessage());
else {
res.result(desc.result());
- res.resultBytes(ctx.config().getMarshaller().marshal(desc.result()));
+ res.resultBytes(MarshallerUtils.marshal(ctx, desc.result()));
}
}
else
res.found(false);
- Object topic = ctx.config().getMarshaller().unmarshal(req.topicBytes(),
+ Object topic = MarshallerUtils.unmarshal(ctx.gridName(), ctx.config().getMarshaller(), req.topicBytes(),
U.resolveClassLoader(ctx.config()));
ctx.io().send(nodeId, topic, res, SYSTEM_POOL);
@@ -440,7 +441,7 @@ public class GridTaskCommandHandler extends GridRestCommandHandlerAdapter {
res = (GridTaskResultResponse)msg;
try {
- res.result(ctx.config().getMarshaller().unmarshal(res.resultBytes(),
+ res.result(MarshallerUtils.unmarshal(ctx.gridName(), ctx.config().getMarshaller(), res.resultBytes(),
U.resolveClassLoader(ctx.config())));
}
catch (IgniteCheckedException e) {
@@ -494,7 +495,7 @@ public class GridTaskCommandHandler extends GridRestCommandHandlerAdapter {
// 2. Send message.
try {
- byte[] topicBytes = ctx.config().getMarshaller().marshal(topic);
+ byte[] topicBytes = MarshallerUtils.marshal(ctx, topic);
ctx.io().send(taskNode, TOPIC_REST, new GridTaskResultRequest(taskId, topic, topicBytes), SYSTEM_POOL);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/e5f3abd2/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/protocols/tcp/GridTcpRestParser.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/protocols/tcp/GridTcpRestParser.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/protocols/tcp/GridTcpRestParser.java
index 5beff75..133d754 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/protocols/tcp/GridTcpRestParser.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/protocols/tcp/GridTcpRestParser.java
@@ -39,6 +39,7 @@ import org.apache.ignite.internal.util.nio.GridNioSession;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.marshaller.Marshaller;
+import org.apache.ignite.marshaller.MarshallerUtils;
import org.apache.ignite.marshaller.jdk.JdkMarshaller;
import org.jetbrains.annotations.Nullable;
@@ -169,7 +170,7 @@ public class GridTcpRestParser implements GridNioParser {
GridClientMessage msg = (GridClientMessage)msg0;
if (msg instanceof GridMemcachedMessage)
- return encodeMemcache((GridMemcachedMessage)msg);
+ return encodeMemcache((GridMemcachedMessage)msg, ses.gridName());
else if (msg instanceof GridClientPingPacket)
return ByteBuffer.wrap(GridClientPingPacket.PING_PACKET);
else if (msg instanceof GridClientHandshakeRequest) {
@@ -533,10 +534,12 @@ public class GridTcpRestParser implements GridNioParser {
* Encodes memcache message to a raw byte array.
*
* @param msg Message being serialized.
+ * @param gridName Grid name.
* @return Serialized message.
* @throws IgniteCheckedException If serialization failed.
*/
- private ByteBuffer encodeMemcache(GridMemcachedMessage msg) throws IgniteCheckedException {
+ private ByteBuffer encodeMemcache(GridMemcachedMessage msg,
+ final String gridName) throws IgniteCheckedException {
GridByteArrayList res = new GridByteArrayList(HDR_LEN);
int keyLen = 0;
@@ -546,7 +549,7 @@ public class GridTcpRestParser implements GridNioParser {
if (msg.key() != null) {
ByteArrayOutputStream rawKey = new ByteArrayOutputStream();
- keyFlags = encodeObj(msg.key(), rawKey);
+ keyFlags = encodeObj(msg.key(), rawKey, gridName);
msg.key(rawKey.toByteArray());
@@ -560,7 +563,7 @@ public class GridTcpRestParser implements GridNioParser {
if (msg.value() != null) {
ByteArrayOutputStream rawVal = new ByteArrayOutputStream();
- valFlags = encodeObj(msg.value(), rawVal);
+ valFlags = encodeObj(msg.value(), rawVal, gridName);
msg.value(rawVal.toByteArray());
@@ -645,7 +648,7 @@ public class GridTcpRestParser implements GridNioParser {
byte[] rawKey = (byte[])req.key();
// Only values can be hessian-encoded.
- req.key(decodeObj(keyFlags, rawKey));
+ req.key(decodeObj(keyFlags, rawKey, ses.gridName()));
}
if (req.value() != null) {
@@ -653,7 +656,7 @@ public class GridTcpRestParser implements GridNioParser {
byte[] rawVal = (byte[])req.value();
- req.value(decodeObj(valFlags, rawVal));
+ req.value(decodeObj(valFlags, rawVal, ses.gridName()));
}
}
@@ -711,14 +714,16 @@ public class GridTcpRestParser implements GridNioParser {
*
* @param flags Flags.
* @param bytes Byte array to decode.
+ * @param gridName Grid name.
* @return Decoded value.
* @throws IgniteCheckedException If deserialization failed.
*/
- private Object decodeObj(short flags, byte[] bytes) throws IgniteCheckedException {
+ private Object decodeObj(short flags, byte[] bytes,
+ final String gridName) throws IgniteCheckedException {
assert bytes != null;
if ((flags & SERIALIZED_FLAG) != 0)
- return jdkMarshaller.unmarshal(bytes, null);
+ return MarshallerUtils.unmarshal(gridName, jdkMarshaller, bytes, null);
int masked = flags & 0xff00;
@@ -749,10 +754,11 @@ public class GridTcpRestParser implements GridNioParser {
*
* @param obj Object to serialize.
* @param out Output stream to which object should be written.
+ * @param gridName Grid name.
* @return Serialization flags.
* @throws IgniteCheckedException If JDK serialization failed.
*/
- private int encodeObj(Object obj, ByteArrayOutputStream out) throws IgniteCheckedException {
+ private int encodeObj(Object obj, ByteArrayOutputStream out, final String gridName) throws IgniteCheckedException {
int flags = 0;
byte[] data = null;
@@ -800,7 +806,7 @@ public class GridTcpRestParser implements GridNioParser {
flags |= BYTE_ARR_FLAG;
}
else {
- jdkMarshaller.marshal(obj, out);
+ MarshallerUtils.marshal(gridName, jdkMarshaller, obj, out);
flags |= SERIALIZED_FLAG;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/e5f3abd2/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
index b418ba2..654114d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
@@ -83,6 +83,7 @@ import org.apache.ignite.lang.IgniteProductVersion;
import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.marshaller.Marshaller;
import org.apache.ignite.resources.IgniteInstanceResource;
+import org.apache.ignite.marshaller.MarshallerUtils;
import org.apache.ignite.resources.JobContextResource;
import org.apache.ignite.resources.LoggerResource;
import org.apache.ignite.services.Service;
@@ -447,18 +448,16 @@ public class GridServiceProcessor extends GridProcessorAdapter {
validate(cfg);
if (!state.srvcCompatibility) {
- Marshaller marsh = ctx.config().getMarshaller();
-
LazyServiceConfiguration cfg0;
try {
- byte[] srvcBytes = marsh.marshal(cfg.getService());
+ byte[] srvcBytes = MarshallerUtils.marshal(ctx, cfg.getService());
cfg0 = new LazyServiceConfiguration(cfg, srvcBytes);
}
catch (IgniteCheckedException e) {
U.error(log, "Failed to marshal service with configured marshaller [srvc=" + cfg.getService()
- + ", marsh=" + marsh + "]", e);
+ + ", marsh=" + ctx.config().getMarshaller() + "]", e);
return new GridFinishedFuture<>(e);
}
@@ -1123,7 +1122,7 @@ public class GridServiceProcessor extends GridProcessorAdapter {
if (cfg instanceof LazyServiceConfiguration) {
byte[] bytes = ((LazyServiceConfiguration)cfg).serviceBytes();
- Service srvc = m.unmarshal(bytes, U.resolveClassLoader(null, ctx.config()));
+ Service srvc = MarshallerUtils.unmarshal(ctx.gridName(), m, bytes, U.resolveClassLoader(null, ctx.config()));
ctx.resource().inject(srvc);
@@ -1133,10 +1132,10 @@ public class GridServiceProcessor extends GridProcessorAdapter {
Service svc = cfg.getService();
try {
- byte[] bytes = m.marshal(svc);
+ byte[] bytes = MarshallerUtils.marshal(ctx, svc);
- Service cp = m.unmarshal(bytes,
- U.resolveClassLoader(svc.getClass().getClassLoader(), ctx.config()));
+ Service cp = MarshallerUtils.unmarshal(ctx.gridName(), m, bytes, U.resolveClassLoader(svc.getClass().getClassLoader(),
+ ctx.config()));
ctx.resource().inject(cp);