You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2016/08/24 14:04:52 UTC
[29/50] [abbrv] ignite git commit: IGNITE-2649 -
Ignition.localIgnite() unreliable under Gateways and cause wrong components
deserialization.
http://git-wip-us.apache.org/repos/asf/ignite/blob/97ce8fbb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
index 6194552..a1f6700 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
@@ -29,6 +29,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.marshaller.MarshallerUtils;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.plugin.extensions.communication.MessageReader;
import org.apache.ignite.plugin.extensions.communication.MessageWriter;
@@ -133,10 +134,10 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
super.prepareMarshal(ctx);
if (parts != null && partsBytes == null)
- partsBytes = ctx.marshaller().marshal(parts);
+ partsBytes = MarshallerUtils.marshal(ctx.marshaller(), parts, ctx.gridName());
if (partCntrs != null)
- partCntrsBytes = ctx.marshaller().marshal(partCntrs);
+ partCntrsBytes = MarshallerUtils.marshal(ctx.marshaller(), partCntrs, ctx.gridName());
}
/**
@@ -157,14 +158,18 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
@Override public void finishUnmarshal(GridCacheSharedContext ctx, ClassLoader ldr) throws IgniteCheckedException {
super.finishUnmarshal(ctx, ldr);
- if (partsBytes != null && parts == null)
- parts = ctx.marshaller().unmarshal(partsBytes, U.resolveClassLoader(ldr, ctx.gridConfig()));
+ if (partsBytes != null && parts == null) {
+ parts = MarshallerUtils.unmarshal(ctx.marshaller(), partsBytes,
+ U.resolveClassLoader(ldr, ctx.gridConfig()), ctx.gridName());
+ }
if (parts == null)
parts = new HashMap<>();
- if (partCntrsBytes != null && partCntrs == null)
- partCntrs = ctx.marshaller().unmarshal(partCntrsBytes, U.resolveClassLoader(ldr, ctx.gridConfig()));
+ if (partCntrsBytes != null && partCntrs == null) {
+ partCntrs = MarshallerUtils.unmarshal(ctx.marshaller(), partCntrsBytes,
+ U.resolveClassLoader(ldr, ctx.gridConfig()), ctx.gridName());
+ }
if (partCntrs == null)
partCntrs = new HashMap<>();
http://git-wip-us.apache.org/repos/asf/ignite/blob/97ce8fbb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java
index 0fe4259..3be312b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java
@@ -28,6 +28,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.marshaller.MarshallerUtils;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.plugin.extensions.communication.MessageReader;
import org.apache.ignite.plugin.extensions.communication.MessageWriter;
@@ -137,21 +138,25 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes
super.prepareMarshal(ctx);
if (partsBytes == null && parts != null)
- partsBytes = ctx.marshaller().marshal(parts);
+ partsBytes = MarshallerUtils.marshal(ctx.marshaller(), parts, ctx.gridName());
if (partCntrs != null)
- partCntrsBytes = ctx.marshaller().marshal(partCntrs);
+ partCntrsBytes = MarshallerUtils.marshal(ctx.marshaller(), partCntrs, ctx.gridName());
}
/** {@inheritDoc} */
@Override public void finishUnmarshal(GridCacheSharedContext ctx, ClassLoader ldr) throws IgniteCheckedException {
super.finishUnmarshal(ctx, ldr);
- if (partsBytes != null && parts == null)
- parts = ctx.marshaller().unmarshal(partsBytes, U.resolveClassLoader(ldr, ctx.gridConfig()));
+ if (partsBytes != null && parts == null) {
+ parts = MarshallerUtils.unmarshal(ctx.marshaller(), partsBytes,
+ U.resolveClassLoader(ldr, ctx.gridConfig()), ctx.gridName());
+ }
- if (partCntrsBytes != null)
- partCntrs = ctx.marshaller().unmarshal(partCntrsBytes, U.resolveClassLoader(ldr, ctx.gridConfig()));
+ if (partCntrsBytes != null) {
+ partCntrs = MarshallerUtils.unmarshal(ctx.marshaller(), partCntrsBytes,
+ U.resolveClassLoader(ldr, ctx.gridConfig()), ctx.gridName());
+ }
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/97ce8fbb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetResponse.java
index be92551..ff1a373 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetResponse.java
@@ -37,6 +37,7 @@ import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteUuid;
+import org.apache.ignite.marshaller.MarshallerUtils;
import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType;
import org.apache.ignite.plugin.extensions.communication.MessageReader;
import org.apache.ignite.plugin.extensions.communication.MessageWriter;
@@ -190,7 +191,7 @@ public class GridNearGetResponse extends GridCacheMessage implements GridCacheDe
}
if (err != null)
- errBytes = ctx.marshaller().marshal(err);
+ errBytes = MarshallerUtils.marshal(ctx.marshaller(), err, ctx.gridName());
}
/** {@inheritDoc} */
@@ -204,8 +205,10 @@ public class GridNearGetResponse extends GridCacheMessage implements GridCacheDe
info.unmarshal(cctx, ldr);
}
- if (errBytes != null)
- err = ctx.marshaller().unmarshal(errBytes, U.resolveClassLoader(ldr, ctx.gridConfig()));
+ if (errBytes != null) {
+ err = MarshallerUtils.unmarshal(ctx.marshaller(), errBytes,
+ U.resolveClassLoader(ldr, ctx.gridConfig()), ctx.gridName());
+ }
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/97ce8fbb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearSingleGetResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearSingleGetResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearSingleGetResponse.java
index e8d164f..e290310 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearSingleGetResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearSingleGetResponse.java
@@ -28,6 +28,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheEntryInfo;
import org.apache.ignite.internal.processors.cache.GridCacheMessage;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.marshaller.MarshallerUtils;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.plugin.extensions.communication.Message;
import org.apache.ignite.plugin.extensions.communication.MessageReader;
@@ -169,7 +170,7 @@ public class GridNearSingleGetResponse extends GridCacheMessage implements GridC
}
if (err != null)
- errBytes = ctx.marshaller().marshal(err);
+ errBytes = MarshallerUtils.marshal(ctx.marshaller(), err, ctx.gridName());
}
/** {@inheritDoc} */
@@ -187,8 +188,10 @@ public class GridNearSingleGetResponse extends GridCacheMessage implements GridC
((GridCacheEntryInfo)res).unmarshal(cctx, ldr);
}
- if (errBytes != null && err == null)
- err = ctx.marshaller().unmarshal(errBytes, U.resolveClassLoader(ldr, ctx.gridConfig()));
+ if (errBytes != null && err == null) {
+ err = MarshallerUtils.unmarshal(ctx.marshaller(), errBytes,
+ U.resolveClassLoader(ldr, ctx.gridConfig()), ctx.gridName());
+ }
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/97ce8fbb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishResponse.java
index 4055b2a..809e11c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishResponse.java
@@ -27,6 +27,7 @@ import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteUuid;
+import org.apache.ignite.marshaller.MarshallerUtils;
import org.apache.ignite.plugin.extensions.communication.MessageReader;
import org.apache.ignite.plugin.extensions.communication.MessageWriter;
import org.jetbrains.annotations.Nullable;
@@ -101,15 +102,17 @@ public class GridNearTxFinishResponse extends GridDistributedTxFinishResponse {
super.prepareMarshal(ctx);
if (err != null)
- errBytes = ctx.marshaller().marshal(err);
+ errBytes = MarshallerUtils.marshal(ctx.marshaller(), err, ctx.gridName());
}
/** {@inheritDoc} */
@Override public void finishUnmarshal(GridCacheSharedContext ctx, ClassLoader ldr) throws IgniteCheckedException {
super.finishUnmarshal(ctx, ldr);
- if (errBytes != null)
- err = ctx.marshaller().unmarshal(errBytes, U.resolveClassLoader(ldr, ctx.gridConfig()));
+ if (errBytes != null) {
+ err = MarshallerUtils.unmarshal(ctx.marshaller(), errBytes,
+ U.resolveClassLoader(ldr, ctx.gridConfig()), ctx.gridName());
+ }
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/97ce8fbb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheLocalQueryFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheLocalQueryFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheLocalQueryFuture.java
index 91ae12c..d524d70 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheLocalQueryFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheLocalQueryFuture.java
@@ -26,6 +26,7 @@ import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteClosure;
import org.apache.ignite.lang.IgniteReducer;
import org.apache.ignite.marshaller.Marshaller;
+import org.apache.ignite.marshaller.MarshallerUtils;
/**
* Local query future.
@@ -112,13 +113,13 @@ public class GridCacheLocalQueryFuture<K, V, R> extends GridCacheQueryFutureAdap
Marshaller marsh = cctx.marshaller();
- IgniteReducer<Object, Object> rdc = qry.reducer() != null ?
- marsh.<IgniteReducer<Object, Object>>unmarshal(marsh.marshal(qry.reducer()),
- U.resolveClassLoader(cctx.gridConfig())) : null;
+ IgniteReducer<Object, Object> rdc = qry.reducer() != null
+ ? MarshallerUtils.clone(marsh, qry.reducer(), U.resolveClassLoader(cctx.gridConfig()), cctx.gridName())
+ : null;
- IgniteClosure<Object, Object> trans = qry.transform() != null ?
- marsh.<IgniteClosure<Object, Object>>unmarshal(marsh.marshal(qry.transform()),
- U.resolveClassLoader(cctx.gridConfig())) : null;
+ IgniteClosure<Object, Object> trans = qry.transform() != null
+ ? MarshallerUtils.clone(marsh, qry.transform(), U.resolveClassLoader(cctx.gridConfig()), cctx.gridName())
+ : null;
return new GridCacheQueryInfo(
true,
http://git-wip-us.apache.org/repos/asf/ignite/blob/97ce8fbb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryRequest.java
index 88ebe99..b5a6fe2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryRequest.java
@@ -35,6 +35,7 @@ import org.apache.ignite.lang.IgniteBiPredicate;
import org.apache.ignite.lang.IgniteClosure;
import org.apache.ignite.lang.IgniteReducer;
import org.apache.ignite.marshaller.Marshaller;
+import org.apache.ignite.marshaller.MarshallerUtils;
import org.apache.ignite.plugin.extensions.communication.MessageReader;
import org.apache.ignite.plugin.extensions.communication.MessageWriter;
import org.jetbrains.annotations.Nullable;
@@ -316,19 +317,25 @@ public class GridCacheQueryRequest extends GridCacheMessage implements GridCache
@Override public void finishUnmarshal(GridCacheSharedContext ctx, ClassLoader ldr) throws IgniteCheckedException {
super.finishUnmarshal(ctx, ldr);
- Marshaller mrsh = ctx.marshaller();
+ Marshaller marsh = ctx.marshaller();
- if (keyValFilterBytes != null)
- keyValFilter = mrsh.unmarshal(keyValFilterBytes, U.resolveClassLoader(ldr, ctx.gridConfig()));
+ if (keyValFilterBytes != null) {
+ keyValFilter = MarshallerUtils.unmarshal(marsh, keyValFilterBytes,
+ U.resolveClassLoader(ldr, ctx.gridConfig()), ctx.gridName());
+ }
if (rdcBytes != null)
- rdc = mrsh.unmarshal(rdcBytes, ldr);
+ rdc = MarshallerUtils.unmarshal(marsh, rdcBytes, ldr, ctx.gridName());
- if (transBytes != null)
- trans = mrsh.unmarshal(transBytes, U.resolveClassLoader(ldr, ctx.gridConfig()));
+ if (transBytes != null) {
+ trans = MarshallerUtils.unmarshal(marsh, transBytes,
+ U.resolveClassLoader(ldr, ctx.gridConfig()), ctx.gridName());
+ }
- if (argsBytes != null)
- args = mrsh.unmarshal(argsBytes, U.resolveClassLoader(ldr, ctx.gridConfig()));
+ if (argsBytes != null) {
+ args = MarshallerUtils.unmarshal(marsh, argsBytes,
+ U.resolveClassLoader(ldr, ctx.gridConfig()), ctx.gridName());
+ }
}
/** {@inheritDoc} */
@@ -343,10 +350,10 @@ public class GridCacheQueryRequest extends GridCacheMessage implements GridCache
void beforeLocalExecution(GridCacheContext ctx) throws IgniteCheckedException {
Marshaller marsh = ctx.marshaller();
- rdc = rdc != null ? marsh.<IgniteReducer<Object, Object>>unmarshal(marsh.marshal(rdc),
- U.resolveClassLoader(ctx.gridConfig())) : null;
- trans = trans != null ? marsh.<IgniteClosure<Object, Object>>unmarshal(marsh.marshal(trans),
- U.resolveClassLoader(ctx.gridConfig())) : null;
+ rdc = rdc != null ? MarshallerUtils.clone(marsh, rdc,
+ U.resolveClassLoader(ctx.gridConfig()), ctx.gridName()) : null;
+ trans = trans != null ? MarshallerUtils.clone(marsh, trans,
+ U.resolveClassLoader(ctx.gridConfig()), ctx.gridName()) : null;
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/97ce8fbb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryResponse.java
index 2cde8f4..28ae466 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryResponse.java
@@ -33,6 +33,7 @@ import org.apache.ignite.internal.processors.query.GridQueryFieldMetadata;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.marshaller.MarshallerUtils;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType;
import org.apache.ignite.plugin.extensions.communication.MessageReader;
@@ -124,7 +125,7 @@ public class GridCacheQueryResponse extends GridCacheMessage implements GridCach
GridCacheContext cctx = ctx.cacheContext(cacheId);
if (err != null)
- errBytes = ctx.marshaller().marshal(err);
+ errBytes = MarshallerUtils.marshal(ctx.marshaller(), err, ctx.gridName());
metaDataBytes = marshalCollection(metadata, cctx);
dataBytes = marshalCollection(data, cctx);
@@ -145,8 +146,10 @@ public class GridCacheQueryResponse extends GridCacheMessage implements GridCach
@Override public void finishUnmarshal(GridCacheSharedContext ctx, ClassLoader ldr) throws IgniteCheckedException {
super.finishUnmarshal(ctx, ldr);
- if (errBytes != null)
- err = ctx.marshaller().unmarshal(errBytes, U.resolveClassLoader(ldr, ctx.gridConfig()));
+ if (errBytes != null) {
+ err = MarshallerUtils.unmarshal(ctx.marshaller(), errBytes,
+ U.resolveClassLoader(ldr, ctx.gridConfig()), ctx.gridName());
+ }
metadata = unmarshalCollection(metaDataBytes, ctx, ldr);
data = unmarshalCollection(dataBytes, ctx, ldr);
http://git-wip-us.apache.org/repos/asf/ignite/blob/97ce8fbb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheSqlQuery.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheSqlQuery.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheSqlQuery.java
index 0733827..df6621f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheSqlQuery.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheSqlQuery.java
@@ -28,6 +28,7 @@ import org.apache.ignite.internal.util.typedef.internal.A;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.marshaller.Marshaller;
+import org.apache.ignite.marshaller.MarshallerUtils;
import org.apache.ignite.plugin.extensions.communication.Message;
import org.apache.ignite.plugin.extensions.communication.MessageReader;
import org.apache.ignite.plugin.extensions.communication.MessageWriter;
@@ -132,19 +133,21 @@ public class GridCacheSqlQuery implements Message {
/**
* @param m Marshaller.
+ * @param kernalCtx kernal context.
* @throws IgniteCheckedException If failed.
*/
- public void marshallParams(Marshaller m) throws IgniteCheckedException {
+ public void marshallParams(Marshaller m, final GridKernalContext kernalCtx) throws IgniteCheckedException {
if (paramsBytes != null)
return;
assert params != null;
- paramsBytes = m.marshal(params);
+ paramsBytes = MarshallerUtils.marshal(m, params, kernalCtx.gridName());
}
/**
* @param m Marshaller.
+ * @param ctx kernal context.
* @throws IgniteCheckedException If failed.
*/
public void unmarshallParams(Marshaller m, GridKernalContext ctx) throws IgniteCheckedException {
@@ -153,7 +156,7 @@ public class GridCacheSqlQuery implements Message {
assert paramsBytes != null;
- params = m.unmarshal(paramsBytes, U.resolveClassLoader(ctx.config()));
+ params = MarshallerUtils.unmarshal(m, paramsBytes, U.resolveClassLoader(ctx.config()), ctx.gridName());
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/97ce8fbb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
index fc38eba..a9cf668 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
@@ -77,6 +77,7 @@ import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteAsyncCallback;
import org.apache.ignite.lang.IgniteBiTuple;
+import org.apache.ignite.marshaller.MarshallerUtils;
import org.apache.ignite.thread.IgniteStripedThreadPoolExecutor;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
@@ -1541,7 +1542,7 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
depInfo = new GridDeploymentInfoBean(dep);
- bytes = ctx.config().getMarshaller().marshal(obj);
+ bytes = MarshallerUtils.marshal(ctx.config().getMarshaller(), obj, ctx.gridName());
}
/**
@@ -1559,7 +1560,8 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
if (dep == null)
throw new IgniteDeploymentCheckedException("Failed to obtain deployment for class: " + clsName);
- return ctx.config().getMarshaller().unmarshal(bytes, U.resolveClassLoader(dep.classLoader(), ctx.config()));
+ return MarshallerUtils.unmarshal(ctx.config().getMarshaller(), bytes,
+ U.resolveClassLoader(dep.classLoader(), ctx.config()), ctx.gridName());
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/97ce8fbb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/jdbc/GridCacheQueryJdbcMetadataTask.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/jdbc/GridCacheQueryJdbcMetadataTask.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/jdbc/GridCacheQueryJdbcMetadataTask.java
index a8b2da4..ec242f1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/jdbc/GridCacheQueryJdbcMetadataTask.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/jdbc/GridCacheQueryJdbcMetadataTask.java
@@ -41,6 +41,7 @@ import org.apache.ignite.internal.processors.cache.query.GridCacheSqlMetadata;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.marshaller.Marshaller;
+import org.apache.ignite.marshaller.MarshallerUtils;
import org.apache.ignite.marshaller.jdk.JdkMarshaller;
import org.apache.ignite.resources.IgniteInstanceResource;
import org.apache.ignite.resources.LoggerResource;
@@ -153,7 +154,7 @@ public class GridCacheQueryJdbcMetadataTask extends ComputeTaskAdapter<String, b
status = 0;
- data = MARSHALLER.marshal(F.asList(schemasMap, indexesInfo));
+ data = MarshallerUtils.marshal(MARSHALLER, F.asList(schemasMap, indexesInfo), ignite.configuration());
}
catch (Throwable t) {
U.error(log, "Failed to get metadata for JDBC.", t);
@@ -163,7 +164,7 @@ public class GridCacheQueryJdbcMetadataTask extends ComputeTaskAdapter<String, b
status = 1;
try {
- data = MARSHALLER.marshal(err);
+ data = MarshallerUtils.marshal(MARSHALLER, err, ignite.configuration());
}
catch (IgniteCheckedException e) {
throw new IgniteException(e);
http://git-wip-us.apache.org/repos/asf/ignite/blob/97ce8fbb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/jdbc/GridCacheQueryJdbcTask.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/jdbc/GridCacheQueryJdbcTask.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/jdbc/GridCacheQueryJdbcTask.java
index 5c60762..3faca1b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/jdbc/GridCacheQueryJdbcTask.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/jdbc/GridCacheQueryJdbcTask.java
@@ -52,6 +52,7 @@ import org.apache.ignite.internal.util.typedef.CAX;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.marshaller.Marshaller;
+import org.apache.ignite.marshaller.MarshallerUtils;
import org.apache.ignite.marshaller.jdk.JdkMarshaller;
import org.apache.ignite.resources.IgniteInstanceResource;
import org.apache.ignite.resources.LoggerResource;
@@ -82,7 +83,7 @@ public class GridCacheQueryJdbcTask extends ComputeTaskAdapter<byte[], byte[]> {
try {
assert arg != null;
- Map<String, Object> args = MARSHALLER.unmarshal(arg, null);
+ Map<String, Object> args = MarshallerUtils.unmarshal(MARSHALLER, arg, null, ignite.configuration());
boolean first = true;
@@ -130,12 +131,13 @@ public class GridCacheQueryJdbcTask extends ComputeTaskAdapter<byte[], byte[]> {
if (res.getException() == null) {
status = 0;
- bytes = MARSHALLER.marshal(res.getData());
+ bytes = MarshallerUtils.marshal(MARSHALLER, res.getData(), ignite.configuration());
}
else {
status = 1;
- bytes = MARSHALLER.marshal(new SQLException(res.getException().getMessage()));
+ bytes = MarshallerUtils.marshal(MARSHALLER, new SQLException(res.getException().getMessage()),
+ ignite.configuration());
}
byte[] packet = new byte[bytes.length + 1];
http://git-wip-us.apache.org/repos/asf/ignite/blob/97ce8fbb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
index 3258ce9..8bbe151 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
@@ -48,6 +48,7 @@ import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.T2;
import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.marshaller.MarshallerUtils;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.plugin.extensions.communication.Message;
import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType;
@@ -878,8 +879,8 @@ public class IgniteTxEntry implements GridPeerDeployAware, Message {
// Unmarshal transform closure anyway if it exists.
if (transformClosBytes != null && entryProcessorsCol == null)
- entryProcessorsCol = ctx.marshaller().unmarshal(transformClosBytes,
- U.resolveClassLoader(clsLdr, ctx.gridConfig()));
+ entryProcessorsCol = MarshallerUtils.unmarshal(ctx.marshaller(),
+ transformClosBytes, U.resolveClassLoader(clsLdr, ctx.gridConfig()), ctx.gridName());
if (filters == null)
filters = CU.empty0();
@@ -894,8 +895,10 @@ public class IgniteTxEntry implements GridPeerDeployAware, Message {
val.unmarshal(this.ctx, clsLdr);
- if (expiryPlcBytes != null)
- expiryPlc = ctx.marshaller().unmarshal(expiryPlcBytes, U.resolveClassLoader(clsLdr, ctx.gridConfig()));
+ if (expiryPlcBytes != null) {
+ expiryPlc = MarshallerUtils.unmarshal(ctx.marshaller(), expiryPlcBytes,
+ U.resolveClassLoader(clsLdr, ctx.gridConfig()), ctx.gridName());
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/97ce8fbb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheRawVersionedEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheRawVersionedEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheRawVersionedEntry.java
index 4c5a704..2c64209 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheRawVersionedEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheRawVersionedEntry.java
@@ -31,6 +31,7 @@ import org.apache.ignite.internal.processors.datastreamer.DataStreamerEntry;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.marshaller.Marshaller;
+import org.apache.ignite.marshaller.MarshallerUtils;
import org.apache.ignite.plugin.extensions.communication.MessageReader;
import org.apache.ignite.plugin.extensions.communication.MessageWriter;
import org.jetbrains.annotations.Nullable;
@@ -191,7 +192,8 @@ public class GridCacheRawVersionedEntry<K, V> extends DataStreamerEntry implemen
unmarshalKey(ctx, marsh);
if (val == null && valBytes != null) {
- val = marsh.unmarshal(valBytes, U.resolveClassLoader(ctx.kernalContext().config()));
+ val = MarshallerUtils.unmarshal(marsh, valBytes,
+ U.resolveClassLoader(ctx.kernalContext().config()), ctx.kernalContext().gridName());
val.finishUnmarshal(ctx, null);
}
@@ -222,7 +224,8 @@ public class GridCacheRawVersionedEntry<K, V> extends DataStreamerEntry implemen
if (key == null) {
assert keyBytes != null;
- key = marsh.unmarshal(keyBytes, U.resolveClassLoader(ctx.kernalContext().config()));
+ key = MarshallerUtils.unmarshal(marsh, keyBytes,
+ U.resolveClassLoader(ctx.kernalContext().config()), ctx.kernalContext().gridName());
key.finishUnmarshal(ctx, null);
}
@@ -239,13 +242,13 @@ public class GridCacheRawVersionedEntry<K, V> extends DataStreamerEntry implemen
if (keyBytes == null) {
key.prepareMarshal(ctx);
- keyBytes = marsh.marshal(key);
+ keyBytes = MarshallerUtils.marshal(marsh, key, ctx.kernalContext().gridName());
}
if (valBytes == null && val != null) {
val.prepareMarshal(ctx);
- valBytes = marsh.marshal(val);
+ valBytes = MarshallerUtils.marshal(marsh, val, ctx.kernalContext().gridName());
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/97ce8fbb/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessorImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessorImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessorImpl.java
index 6630c7a..d46fdbd 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessorImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessorImpl.java
@@ -43,6 +43,7 @@ import org.apache.ignite.internal.util.IgniteUtils;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteUuid;
+import org.apache.ignite.marshaller.MarshallerUtils;
import org.jetbrains.annotations.Nullable;
import static org.apache.ignite.cache.CacheMemoryMode.OFFHEAP_VALUES;
@@ -107,8 +108,8 @@ public class IgniteCacheObjectProcessorImpl extends GridProcessorAdapter impleme
@Override public Object unmarshal(CacheObjectContext ctx, byte[] bytes, ClassLoader clsLdr)
throws IgniteCheckedException
{
- return ctx.kernalContext().cache().context().marshaller().unmarshal(bytes, U.resolveClassLoader(clsLdr,
- ctx.kernalContext().config()));
+ return MarshallerUtils.unmarshal(ctx.kernalContext().cache().context().marshaller(),
+ bytes, U.resolveClassLoader(clsLdr, ctx.kernalContext().config()), ctx.kernalContext().gridName());
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/97ce8fbb/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java
index 1a96954..e17c3fe 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java
@@ -70,6 +70,7 @@ import org.apache.ignite.lang.IgniteClosure;
import org.apache.ignite.lang.IgniteProductVersion;
import org.apache.ignite.lang.IgniteReducer;
import org.apache.ignite.marshaller.Marshaller;
+import org.apache.ignite.marshaller.MarshallerUtils;
import org.apache.ignite.resources.LoadBalancerResource;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
@@ -1144,16 +1145,16 @@ public class GridClosureProcessor extends GridProcessorAdapter {
if (closureBytes == null) {
closure = c.job;
- closureBytes = marsh.marshal(c.job);
+ closureBytes = MarshallerUtils.marshal(marsh, c.job, ctx.gridName());
}
if (c.job == closure)
- c.job = marsh.unmarshal(closureBytes, U.resolveClassLoader(ctx.config()));
+ c.job = MarshallerUtils.unmarshal(marsh, closureBytes, U.resolveClassLoader(ctx.config()), ctx.gridName());
else
- c.job = marsh.unmarshal(marsh.marshal(c.job), U.resolveClassLoader(ctx.config()));
+ c.job = MarshallerUtils.clone(marsh, c.job, U.resolveClassLoader(ctx.config()), ctx.gridName());
}
else
- job = marsh.unmarshal(marsh.marshal(job), U.resolveClassLoader(ctx.config()));
+ job = MarshallerUtils.clone(marsh, job, U.resolveClassLoader(ctx.config()), ctx.gridName());
}
else
hadLocNode = true;
http://git-wip-us.apache.org/repos/asf/ignite/blob/97ce8fbb/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
index e96e646..388487f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
@@ -77,6 +77,7 @@ import org.apache.ignite.lang.IgnitePredicate;
import org.apache.ignite.lang.IgniteProductVersion;
import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.marshaller.Marshaller;
+import org.apache.ignite.marshaller.MarshallerUtils;
import org.apache.ignite.plugin.extensions.communication.Message;
import org.apache.ignite.thread.IgniteThread;
import org.jetbrains.annotations.Nullable;
@@ -288,7 +289,8 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
if (msg.data() == null && msg.dataBytes() != null) {
try {
- msg.data(marsh.unmarshal(msg.dataBytes(), U.resolveClassLoader(ctx.config())));
+ msg.data(MarshallerUtils.unmarshal(marsh, msg.dataBytes(),
+ U.resolveClassLoader(ctx.config()), ctx.gridName()));
}
catch (IgniteCheckedException e) {
U.error(log, "Failed to process message (ignoring): " + msg, e);
@@ -676,7 +678,7 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
reqData.className(clsName);
reqData.deploymentInfo(new GridDeploymentInfoBean(dep));
- reqData.p2pMarshal(marsh);
+ reqData.p2pMarshal(marsh, ctx);
}
// Handle peer deployment for other handler-specific objects.
@@ -733,7 +735,8 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
if (msg.data() == null && msg.dataBytes() != null) {
try {
- msg.data(marsh.unmarshal(msg.dataBytes(), U.resolveClassLoader(ctx.config())));
+ msg.data(MarshallerUtils.unmarshal(marsh, msg.dataBytes(),
+ U.resolveClassLoader(ctx.config()), ctx.gridName()));
}
catch (IgniteCheckedException e) {
U.error(log, "Failed to process message (ignoring): " + msg, e);
@@ -976,7 +979,7 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
if (dep == null)
throw new IgniteDeploymentCheckedException("Failed to obtain deployment for class: " + clsName);
- data.p2pUnmarshal(marsh, U.resolveClassLoader(dep.classLoader(), ctx.config()));
+ data.p2pUnmarshal(marsh, U.resolveClassLoader(dep.classLoader(), ctx.config()), ctx);
}
hnd.p2pUnmarshal(node.id(), ctx);
@@ -1321,7 +1324,7 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
if (!msg.messages() &&
msg.data() != null &&
(nodes.size() > 1 || !ctx.localNodeId().equals(F.first(nodes).id())))
- msg.dataBytes(marsh.marshal(msg.data()));
+ msg.dataBytes(MarshallerUtils.marshal(marsh, msg.data(), ctx.gridName()));
for (ClusterNode node : nodes) {
int cnt = 0;
http://git-wip-us.apache.org/repos/asf/ignite/blob/97ce8fbb/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRequestData.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRequestData.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRequestData.java
index cdfe0e1..be4562a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRequestData.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRequestData.java
@@ -23,11 +23,13 @@ import java.io.ObjectInput;
import java.io.ObjectOutput;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.managers.deployment.GridDeploymentInfo;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgnitePredicate;
import org.apache.ignite.marshaller.Marshaller;
+import org.apache.ignite.marshaller.MarshallerUtils;
import org.jetbrains.annotations.Nullable;
/**
@@ -90,26 +92,29 @@ class StartRequestData implements Externalizable {
/**
* @param marsh Marshaller.
+ * @param kernalCtx Kernal context.
* @throws org.apache.ignite.IgniteCheckedException In case of error.
*/
- void p2pMarshal(Marshaller marsh) throws IgniteCheckedException {
+ void p2pMarshal(Marshaller marsh, final GridKernalContext kernalCtx) throws IgniteCheckedException {
assert marsh != null;
- prjPredBytes = marsh.marshal(prjPred);
+ prjPredBytes = MarshallerUtils.marshal(marsh, prjPred, kernalCtx.gridName());
}
/**
* @param marsh Marshaller.
* @param ldr Class loader.
+ * @param kernalCtx Kernal context.
* @throws org.apache.ignite.IgniteCheckedException In case of error.
*/
- void p2pUnmarshal(Marshaller marsh, @Nullable ClassLoader ldr) throws IgniteCheckedException {
+ void p2pUnmarshal(Marshaller marsh, @Nullable ClassLoader ldr,
+ final GridKernalContext kernalCtx) throws IgniteCheckedException {
assert marsh != null;
assert prjPred == null;
assert prjPredBytes != null;
- prjPred = marsh.unmarshal(prjPredBytes, ldr);
+ prjPred = MarshallerUtils.unmarshal(marsh, prjPredBytes, ldr, kernalCtx.gridName());
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/97ce8fbb/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java
index c7c1f5e..146a864 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java
@@ -37,6 +37,7 @@ import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.internal.util.worker.GridWorker;
import org.apache.ignite.lang.IgniteFuture;
import org.apache.ignite.marshaller.Marshaller;
+import org.apache.ignite.marshaller.MarshallerUtils;
import org.apache.ignite.stream.StreamReceiver;
import org.apache.ignite.thread.IgniteThread;
import org.jetbrains.annotations.Nullable;
@@ -90,8 +91,8 @@ public class DataStreamProcessor<K, V> extends GridProcessorAdapter {
if (ctx.config().isDaemon())
return;
- marshErrBytes = marsh.marshal(new IgniteCheckedException("Failed to marshal response error, " +
- "see node log for details."));
+ marshErrBytes = MarshallerUtils.marshal(marsh, new IgniteCheckedException("Failed to marshal response error, " +
+ "see node log for details."), ctx.gridName());
flusher = new IgniteThread(new GridWorker(ctx.gridName(), "grid-data-loader-flusher", log) {
@Override protected void body() throws InterruptedException {
@@ -235,7 +236,8 @@ public class DataStreamProcessor<K, V> extends GridProcessorAdapter {
Object topic;
try {
- topic = marsh.unmarshal(req.responseTopicBytes(), U.resolveClassLoader(null, ctx.config()));
+ topic = MarshallerUtils.unmarshal(marsh, req.responseTopicBytes(),
+ U.resolveClassLoader(null, ctx.config()), ctx.gridName());
}
catch (IgniteCheckedException e) {
U.error(log, "Failed to unmarshal topic from request: " + req, e);
@@ -275,7 +277,8 @@ public class DataStreamProcessor<K, V> extends GridProcessorAdapter {
StreamReceiver<K, V> updater;
try {
- updater = marsh.unmarshal(req.updaterBytes(), U.resolveClassLoader(clsLdr, ctx.config()));
+ updater = MarshallerUtils.unmarshal(marsh, req.updaterBytes(),
+ U.resolveClassLoader(clsLdr, ctx.config()), ctx.gridName());
if (updater != null)
ctx.resource().injectGeneric(updater);
@@ -329,7 +332,7 @@ public class DataStreamProcessor<K, V> extends GridProcessorAdapter {
byte[] errBytes;
try {
- errBytes = err != null ? marsh.marshal(err) : null;
+ errBytes = err != null ? MarshallerUtils.marshal(marsh, err, ctx.gridName()) : null;
}
catch (Exception e) {
U.error(log, "Failed to marshal error [err=" + err + ", marshErr=" + e + ']', e);
http://git-wip-us.apache.org/repos/asf/ignite/blob/97ce8fbb/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
index 21df559..066c789 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
@@ -98,6 +98,7 @@ import org.apache.ignite.lang.IgniteClosure;
import org.apache.ignite.lang.IgniteFuture;
import org.apache.ignite.lang.IgniteInClosure;
import org.apache.ignite.lang.IgniteUuid;
+import org.apache.ignite.marshaller.MarshallerUtils;
import org.apache.ignite.plugin.security.SecurityPermission;
import org.apache.ignite.stream.StreamReceiver;
import org.jetbrains.annotations.Nullable;
@@ -1359,11 +1360,11 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
if (updaterBytes == null) {
assert rcvr != null;
- updaterBytes = ctx.config().getMarshaller().marshal(rcvr);
+ updaterBytes = MarshallerUtils.marshal(ctx.config().getMarshaller(), rcvr, ctx.gridName());
}
if (topicBytes == null)
- topicBytes = ctx.config().getMarshaller().marshal(topic);
+ topicBytes = MarshallerUtils.marshal(ctx.config().getMarshaller(), topic, ctx.gridName());
}
catch (IgniteCheckedException e) {
U.error(log, "Failed to marshal (request will not be sent).", e);
@@ -1494,9 +1495,8 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
try {
GridPeerDeployAware jobPda0 = jobPda;
- err = ctx.config().getMarshaller().unmarshal(
- errBytes,
- U.resolveClassLoader(jobPda0 != null ? jobPda0.classLoader() : null, ctx.config()));
+ err = MarshallerUtils.unmarshal(ctx.config().getMarshaller(), errBytes,
+ U.resolveClassLoader(jobPda0 != null ? jobPda0.classLoader() : null, ctx.config()), ctx.gridName());
}
catch (IgniteCheckedException e) {
f.onDone(null, new IgniteCheckedException("Failed to unmarshal response.", e));
http://git-wip-us.apache.org/repos/asf/ignite/blob/97ce8fbb/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsAckMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsAckMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsAckMessage.java
index 0bf3dde..646d492 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsAckMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsAckMessage.java
@@ -21,8 +21,10 @@ import java.io.Externalizable;
import java.nio.ByteBuffer;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.internal.GridDirectTransient;
+import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.marshaller.Marshaller;
+import org.apache.ignite.marshaller.MarshallerUtils;
import org.apache.ignite.plugin.extensions.communication.MessageReader;
import org.apache.ignite.plugin.extensions.communication.MessageWriter;
import org.jetbrains.annotations.Nullable;
@@ -92,19 +94,20 @@ public class IgfsAckMessage extends IgfsCommunicationMessage {
}
/** {@inheritDoc} */
- @Override public void prepareMarshal(Marshaller marsh) throws IgniteCheckedException {
- super.prepareMarshal(marsh);
+ @Override public void prepareMarshal(Marshaller marsh, final GridKernalContext kernalCtx) throws IgniteCheckedException {
+ super.prepareMarshal(marsh, kernalCtx);
if (err != null)
- errBytes = marsh.marshal(err);
+ errBytes = MarshallerUtils.marshal(marsh, err, kernalCtx.gridName());
}
/** {@inheritDoc} */
- @Override public void finishUnmarshal(Marshaller marsh, @Nullable ClassLoader ldr) throws IgniteCheckedException {
- super.finishUnmarshal(marsh, ldr);
+ @Override public void finishUnmarshal(Marshaller marsh, @Nullable ClassLoader ldr,
+ final GridKernalContext kernalCtx) throws IgniteCheckedException {
+ super.finishUnmarshal(marsh, ldr, kernalCtx);
if (errBytes != null)
- err = marsh.unmarshal(errBytes, ldr);
+ err = MarshallerUtils.unmarshal(marsh, errBytes, ldr, kernalCtx.gridName());
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/97ce8fbb/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsCommunicationMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsCommunicationMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsCommunicationMessage.java
index 412c45b..d1d40e7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsCommunicationMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsCommunicationMessage.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.igfs;
import java.nio.ByteBuffer;
import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.marshaller.Marshaller;
import org.apache.ignite.plugin.extensions.communication.Message;
import org.apache.ignite.plugin.extensions.communication.MessageReader;
@@ -34,18 +35,21 @@ public abstract class IgfsCommunicationMessage implements Message {
/**
* @param marsh Marshaller.
+ * @param kernalCtx Kernal ctx.
* @throws IgniteCheckedException In case of error.
*/
- public void prepareMarshal(Marshaller marsh) throws IgniteCheckedException {
+ public void prepareMarshal(Marshaller marsh, final GridKernalContext kernalCtx) throws IgniteCheckedException {
// No-op.
}
/**
* @param marsh Marshaller.
* @param ldr Class loader.
+ * @param kernalCtx Kernal ctx.
* @throws IgniteCheckedException In case of error.
*/
- public void finishUnmarshal(Marshaller marsh, @Nullable ClassLoader ldr) throws IgniteCheckedException {
+ public void finishUnmarshal(Marshaller marsh, @Nullable ClassLoader ldr,
+ final GridKernalContext kernalCtx) throws IgniteCheckedException {
// No-op.
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/97ce8fbb/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsContext.java
index a638bf3..76290e3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsContext.java
@@ -148,7 +148,7 @@ public class IgfsContext {
public void send(UUID nodeId, Object topic, IgfsCommunicationMessage msg, byte plc)
throws IgniteCheckedException {
if (!kernalContext().localNodeId().equals(nodeId))
- msg.prepareMarshal(kernalContext().config().getMarshaller());
+ msg.prepareMarshal(kernalContext().config().getMarshaller(), kernalContext());
kernalContext().io().send(nodeId, topic, msg, plc);
}
@@ -163,7 +163,7 @@ public class IgfsContext {
public void send(ClusterNode node, Object topic, IgfsCommunicationMessage msg, byte plc)
throws IgniteCheckedException {
if (!kernalContext().localNodeId().equals(node.id()))
- msg.prepareMarshal(kernalContext().config().getMarshaller());
+ msg.prepareMarshal(kernalContext().config().getMarshaller(), kernalContext());
kernalContext().io().send(node, topic, msg, plc);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/97ce8fbb/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java
index bdf3b08..ed1f525 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java
@@ -1149,7 +1149,7 @@ public class IgfsDataManager extends IgfsManager {
@SuppressWarnings("ThrowableResultOfMethodCallIgnored")
private void processAckMessage(UUID nodeId, IgfsAckMessage ackMsg) {
try {
- ackMsg.finishUnmarshal(igfsCtx.kernalContext().config().getMarshaller(), null);
+ ackMsg.finishUnmarshal(igfsCtx.kernalContext().config().getMarshaller(), null, igfsCtx.kernalContext());
}
catch (IgniteCheckedException e) {
U.error(log, "Failed to unmarshal message (will ignore): " + ackMsg, e);
http://git-wip-us.apache.org/repos/asf/ignite/blob/97ce8fbb/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDeleteMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDeleteMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDeleteMessage.java
index e59b257..7e206e3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDeleteMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDeleteMessage.java
@@ -21,9 +21,11 @@ import java.io.Externalizable;
import java.nio.ByteBuffer;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.internal.GridDirectTransient;
+import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.marshaller.Marshaller;
+import org.apache.ignite.marshaller.MarshallerUtils;
import org.apache.ignite.plugin.extensions.communication.MessageReader;
import org.apache.ignite.plugin.extensions.communication.MessageWriter;
import org.jetbrains.annotations.Nullable;
@@ -91,19 +93,21 @@ public class IgfsDeleteMessage extends IgfsCommunicationMessage {
}
/** {@inheritDoc} */
- @Override public void prepareMarshal(Marshaller marsh) throws IgniteCheckedException {
- super.prepareMarshal(marsh);
+ @Override public void prepareMarshal(Marshaller marsh,
+ final GridKernalContext kernalCtx) throws IgniteCheckedException {
+ super.prepareMarshal(marsh, kernalCtx);
if (err != null)
- errBytes = marsh.marshal(err);
+ errBytes = MarshallerUtils.marshal(marsh, err, kernalCtx.gridName());
}
/** {@inheritDoc} */
- @Override public void finishUnmarshal(Marshaller marsh, @Nullable ClassLoader ldr) throws IgniteCheckedException {
- super.finishUnmarshal(marsh, ldr);
+ @Override public void finishUnmarshal(Marshaller marsh, @Nullable ClassLoader ldr,
+ final GridKernalContext kernalCtx) throws IgniteCheckedException {
+ super.finishUnmarshal(marsh, ldr, kernalCtx);
if (errBytes != null)
- err = marsh.unmarshal(errBytes, ldr);
+ err = MarshallerUtils.unmarshal(marsh, errBytes, ldr, kernalCtx.gridName());
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/97ce8fbb/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsFragmentizerManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsFragmentizerManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsFragmentizerManager.java
index d64c64a..e04445f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsFragmentizerManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsFragmentizerManager.java
@@ -263,7 +263,7 @@ public class IgfsFragmentizerManager extends IgfsManager {
*/
@SuppressWarnings("fallthrough")
private void processFragmentizerRequest(IgfsFragmentizerRequest req) throws IgniteCheckedException {
- req.finishUnmarshal(igfsCtx.kernalContext().config().getMarshaller(), null);
+ req.finishUnmarshal(igfsCtx.kernalContext().config().getMarshaller(), null, igfsCtx.kernalContext());
Collection<IgfsFileAffinityRange> ranges = req.fragmentRanges();
IgniteUuid fileId = req.fileId();
http://git-wip-us.apache.org/repos/asf/ignite/blob/97ce8fbb/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java
index 7234e52..a25d082 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java
@@ -256,7 +256,7 @@ public final class IgfsImpl implements IgfsEx {
if (secondaryFs instanceof HadoopPayloadAware)
secondaryFsPayload = ((HadoopPayloadAware) secondaryFs).getPayload();
- secondaryPaths = new IgfsPaths(secondaryFsPayload, dfltMode, modeRslvr.modesOrdered());
+ secondaryPaths = new IgfsPaths(secondaryFsPayload, dfltMode, modeRslvr.modesOrdered(), igfsCtx);
// Check whether IGFS LRU eviction policy is set on data cache.
String dataCacheName = igfsCtx.configuration().getDataCacheName();
http://git-wip-us.apache.org/repos/asf/ignite/blob/97ce8fbb/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsPaths.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsPaths.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsPaths.java
index 4a79259..7571797 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsPaths.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsPaths.java
@@ -31,6 +31,7 @@ import org.apache.ignite.igfs.IgfsMode;
import org.apache.ignite.igfs.IgfsPath;
import org.apache.ignite.internal.util.typedef.T2;
import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.marshaller.MarshallerUtils;
import org.apache.ignite.marshaller.jdk.JdkMarshaller;
import org.jetbrains.annotations.Nullable;
@@ -63,9 +64,11 @@ public class IgfsPaths implements Externalizable {
* @param payload Payload.
* @param dfltMode Default IGFS mode.
* @param pathModes Path modes.
+ * @param igfsCtx Ignite FS context.
* @throws IgniteCheckedException If failed.
*/
- public IgfsPaths(Object payload, IgfsMode dfltMode, @Nullable List<T2<IgfsPath, IgfsMode>> pathModes)
+ public IgfsPaths(Object payload, IgfsMode dfltMode, @Nullable List<T2<IgfsPath, IgfsMode>> pathModes,
+ final IgfsContext igfsCtx)
throws IgniteCheckedException {
this.dfltMode = dfltMode;
this.pathModes = pathModes;
@@ -75,7 +78,7 @@ public class IgfsPaths implements Externalizable {
else {
ByteArrayOutputStream out = new ByteArrayOutputStream();
- new JdkMarshaller().marshal(payload, out);
+ MarshallerUtils.marshal(new JdkMarshaller(), payload, out, igfsCtx.kernalContext().config().getGridName());
payloadBytes = out.toByteArray();
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/97ce8fbb/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java
index 906d298..baf80e8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java
@@ -76,6 +76,7 @@ import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.lang.IgnitePredicate;
import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.marshaller.Marshaller;
+import org.apache.ignite.marshaller.MarshallerUtils;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.jsr166.ConcurrentHashMap8;
@@ -403,7 +404,7 @@ public class GridJobProcessor extends GridProcessorAdapter {
boolean loc = ctx.localNodeId().equals(taskNode.id()) && !ctx.config().isMarshalLocalJobs();
GridTaskSessionRequest req = new GridTaskSessionRequest(ses.getId(), ses.getJobId(),
- loc ? null : marsh.marshal(attrs), attrs);
+ loc ? null : MarshallerUtils.marshal(marsh, attrs, ctx.gridName()), attrs);
Object topic = TOPIC_TASK.topic(ses.getJobId(), ctx.discovery().localNode().id());
@@ -454,7 +455,7 @@ public class GridJobProcessor extends GridProcessorAdapter {
if (res.jobSiblings() == null) {
try {
- res.unmarshalSiblings(marsh);
+ res.unmarshalSiblings(marsh, ctx);
}
catch (IgniteCheckedException e) {
U.error(log, "Failed to unmarshal job siblings.", e);
@@ -516,7 +517,7 @@ public class GridJobProcessor extends GridProcessorAdapter {
ctx.io().send(taskNode, TOPIC_JOB_SIBLINGS,
new GridJobSiblingsRequest(ses.getId(),
loc ? topic : null,
- loc ? null : marsh.marshal(topic)),
+ loc ? null : MarshallerUtils.marshal(marsh, topic, ctx.gridName())),
SYSTEM_POOL);
// 4. Listen to discovery events.
@@ -1018,7 +1019,8 @@ public class GridJobProcessor extends GridProcessorAdapter {
if (siblings0 == null) {
assert req.getSiblingsBytes() != null;
- siblings0 = marsh.unmarshal(req.getSiblingsBytes(), U.resolveClassLoader(ctx.config()));
+ siblings0 = MarshallerUtils.unmarshal(marsh, req.getSiblingsBytes(),
+ U.resolveClassLoader(ctx.config()), ctx.gridName());
}
siblings = new ArrayList<>(siblings0);
@@ -1029,9 +1031,10 @@ public class GridJobProcessor extends GridProcessorAdapter {
if (req.isSessionFullSupport()) {
sesAttrs = req.getSessionAttributes();
- if (sesAttrs == null)
- sesAttrs = marsh.unmarshal(req.getSessionAttributesBytes(),
- U.resolveClassLoader(dep.classLoader(), ctx.config()));
+ if (sesAttrs == null) {
+ sesAttrs = MarshallerUtils.unmarshal(marsh, req.getSessionAttributesBytes(),
+ U.resolveClassLoader(dep.classLoader(), ctx.config()), ctx.gridName());
+ }
}
// Note that we unmarshal session/job attributes here with proper class loader.
@@ -1056,9 +1059,10 @@ public class GridJobProcessor extends GridProcessorAdapter {
Map<? extends Serializable, ? extends Serializable> jobAttrs = req.getJobAttributes();
- if (jobAttrs == null)
- jobAttrs = marsh.unmarshal(req.getJobAttributesBytes(),
- U.resolveClassLoader(dep.classLoader(), ctx.config()));
+ if (jobAttrs == null) {
+ jobAttrs = MarshallerUtils.unmarshal(marsh, req.getJobAttributesBytes(),
+ U.resolveClassLoader(dep.classLoader(), ctx.config()), ctx.gridName());
+ }
jobCtx = new GridJobContextImpl(ctx, req.getJobId(), jobAttrs);
}
@@ -1330,11 +1334,11 @@ public class GridJobProcessor extends GridProcessorAdapter {
locNodeId,
req.getSessionId(),
req.getJobId(),
- loc ? null : marsh.marshal(ex),
+ loc ? null : MarshallerUtils.marshal(marsh, ex, ctx.gridName()),
ex,
- loc ? null : marsh.marshal(null),
+ loc ? null : MarshallerUtils.marshal(marsh, null, ctx.gridName()),
null,
- loc ? null : marsh.marshal(null),
+ loc ? null : MarshallerUtils.marshal(marsh, null, ctx.gridName()),
null,
false);
@@ -1425,8 +1429,8 @@ public class GridJobProcessor extends GridProcessorAdapter {
boolean loc = ctx.localNodeId().equals(nodeId) && !ctx.config().isMarshalLocalJobs();
Map<?, ?> attrs = loc ? req.getAttributes() :
- (Map<?, ?>)marsh.unmarshal(req.getAttributesBytes(),
- U.resolveClassLoader(ses.getClassLoader(), ctx.config()));
+ (Map<?, ?>)MarshallerUtils.unmarshal(marsh, req.getAttributesBytes(),
+ U.resolveClassLoader(ses.getClassLoader(), ctx.config()), ctx.gridName());
if (ctx.event().isRecordable(EVT_TASK_SESSION_ATTR_SET)) {
Event evt = new TaskEvent(
http://git-wip-us.apache.org/repos/asf/ignite/blob/97ce8fbb/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java
index 164c9e7..3d17c62 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java
@@ -54,6 +54,7 @@ import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.lang.IgniteRunnable;
import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.marshaller.Marshaller;
+import org.apache.ignite.marshaller.MarshallerUtils;
import org.jetbrains.annotations.Nullable;
import static org.apache.ignite.events.EventType.EVT_JOB_CANCELLED;
@@ -406,7 +407,8 @@ public class GridJobWorker extends GridWorker implements GridTimeoutObject {
try {
if (job == null) {
- job = marsh.unmarshal(jobBytes, U.resolveClassLoader(dep.classLoader(), ctx.config()));
+ job = MarshallerUtils.unmarshal(marsh, jobBytes,
+ U.resolveClassLoader(dep.classLoader(), ctx.config()), ctx.gridName());
// No need to hold reference any more.
jobBytes = null;
@@ -744,11 +746,11 @@ public class GridJobWorker extends GridWorker implements GridTimeoutObject {
ctx.localNodeId(),
ses.getId(),
ses.getJobId(),
- loc ? null : marsh.marshal(ex),
+ loc ? null : MarshallerUtils.marshal(marsh, ex, ctx.gridName()),
loc ? ex : null,
- loc ? null: marsh.marshal(res),
+ loc ? null: MarshallerUtils.marshal(marsh, res, ctx.gridName()),
loc ? res : null,
- loc ? null : marsh.marshal(attrs),
+ loc ? null : MarshallerUtils.marshal(marsh, attrs, ctx.gridName()),
loc ? attrs : null,
isCancelled());
http://git-wip-us.apache.org/repos/asf/ignite/blob/97ce8fbb/modules/core/src/main/java/org/apache/ignite/internal/processors/offheap/GridOffHeapProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/offheap/GridOffHeapProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/offheap/GridOffHeapProcessor.java
index d9d4421..8d4e75e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/offheap/GridOffHeapProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/offheap/GridOffHeapProcessor.java
@@ -33,6 +33,7 @@ import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteBiPredicate;
import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.marshaller.Marshaller;
+import org.apache.ignite.marshaller.MarshallerUtils;
import org.jetbrains.annotations.Nullable;
import org.jsr166.ConcurrentHashMap8;
@@ -108,7 +109,7 @@ public class GridOffHeapProcessor extends GridProcessorAdapter {
private byte[] keyBytes(KeyCacheObject key, @Nullable byte[] keyBytes) throws IgniteCheckedException {
assert key != null;
- return keyBytes != null ? keyBytes : marsh.marshal(key);
+ return keyBytes != null ? keyBytes : MarshallerUtils.marshal(marsh, key, ctx.gridName());
}
/**
@@ -212,7 +213,7 @@ public class GridOffHeapProcessor extends GridProcessorAdapter {
if (valBytes == null)
return null;
- return marsh.unmarshal(valBytes, U.resolveClassLoader(ldr, ctx.config()));
+ return MarshallerUtils.unmarshal(marsh, valBytes, U.resolveClassLoader(ldr, ctx.config()), ctx.gridName());
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/97ce8fbb/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/task/GridTaskCommandHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/task/GridTaskCommandHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/task/GridTaskCommandHandler.java
index 6937196..b6adfca 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/task/GridTaskCommandHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/task/GridTaskCommandHandler.java
@@ -63,6 +63,7 @@ import org.apache.ignite.internal.visor.util.VisorClusterGroupEmptyException;
import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.lang.IgniteInClosure;
import org.apache.ignite.lang.IgniteUuid;
+import org.apache.ignite.marshaller.MarshallerUtils;
import org.apache.ignite.resources.IgniteInstanceResource;
import org.jetbrains.annotations.Nullable;
@@ -135,14 +136,14 @@ public class GridTaskCommandHandler extends GridRestCommandHandlerAdapter {
res.error(err.getMessage());
else {
res.result(desc.result());
- res.resultBytes(ctx.config().getMarshaller().marshal(desc.result()));
+ res.resultBytes(MarshallerUtils.marshal(ctx.config().getMarshaller(), desc.result(), ctx.gridName()));
}
}
else
res.found(false);
- Object topic = ctx.config().getMarshaller().unmarshal(req.topicBytes(),
- U.resolveClassLoader(ctx.config()));
+ Object topic = MarshallerUtils.unmarshal(ctx.config().getMarshaller(), req.topicBytes(),
+ U.resolveClassLoader(ctx.config()), ctx.gridName());
ctx.io().send(nodeId, topic, res, SYSTEM_POOL);
}
@@ -440,8 +441,8 @@ public class GridTaskCommandHandler extends GridRestCommandHandlerAdapter {
res = (GridTaskResultResponse)msg;
try {
- res.result(ctx.config().getMarshaller().unmarshal(res.resultBytes(),
- U.resolveClassLoader(ctx.config())));
+ res.result(MarshallerUtils.unmarshal(ctx.config().getMarshaller(), res.resultBytes(),
+ U.resolveClassLoader(ctx.config()), ctx.gridName()));
}
catch (IgniteCheckedException e) {
U.error(log, "Failed to unmarshal task result: " + res, e);
@@ -494,7 +495,7 @@ public class GridTaskCommandHandler extends GridRestCommandHandlerAdapter {
// 2. Send message.
try {
- byte[] topicBytes = ctx.config().getMarshaller().marshal(topic);
+ byte[] topicBytes = MarshallerUtils.marshal(ctx.config().getMarshaller(), topic, ctx.gridName());
ctx.io().send(taskNode, TOPIC_REST, new GridTaskResultRequest(taskId, topic, topicBytes), SYSTEM_POOL);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/97ce8fbb/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/protocols/tcp/GridTcpRestParser.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/protocols/tcp/GridTcpRestParser.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/protocols/tcp/GridTcpRestParser.java
index 5beff75..21f105d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/protocols/tcp/GridTcpRestParser.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/protocols/tcp/GridTcpRestParser.java
@@ -25,6 +25,7 @@ import java.nio.charset.Charset;
import java.util.Date;
import java.util.UUID;
import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.client.marshaller.GridClientMarshaller;
import org.apache.ignite.internal.processors.rest.client.message.GridClientHandshakeRequest;
import org.apache.ignite.internal.processors.rest.client.message.GridClientHandshakeResponse;
@@ -39,6 +40,7 @@ import org.apache.ignite.internal.util.nio.GridNioSession;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.marshaller.Marshaller;
+import org.apache.ignite.marshaller.MarshallerUtils;
import org.apache.ignite.marshaller.jdk.JdkMarshaller;
import org.jetbrains.annotations.Nullable;
@@ -169,7 +171,7 @@ public class GridTcpRestParser implements GridNioParser {
GridClientMessage msg = (GridClientMessage)msg0;
if (msg instanceof GridMemcachedMessage)
- return encodeMemcache((GridMemcachedMessage)msg);
+ return encodeMemcache((GridMemcachedMessage)msg, ses.igniteConfiguration());
else if (msg instanceof GridClientPingPacket)
return ByteBuffer.wrap(GridClientPingPacket.PING_PACKET);
else if (msg instanceof GridClientHandshakeRequest) {
@@ -208,7 +210,7 @@ public class GridTcpRestParser implements GridNioParser {
else {
GridClientMarshaller marsh = marshaller(ses);
- ByteBuffer res = marsh.marshal(msg, 45);
+ ByteBuffer res = MarshallerUtils.marshal(marsh, msg, 45, ses.igniteConfiguration().getGridName());
ByteBuffer slice = res.slice();
@@ -519,7 +521,8 @@ public class GridTcpRestParser implements GridNioParser {
else {
GridClientMarshaller marsh = marshaller(ses);
- msg = marsh.unmarshal(state.buffer().toByteArray());
+ msg = MarshallerUtils.unmarshal(marsh, state.buffer().toByteArray(),
+ ses.igniteConfiguration().getGridName());
msg.requestId(state.header().reqId());
msg.clientId(state.header().clientId());
@@ -533,10 +536,12 @@ public class GridTcpRestParser implements GridNioParser {
* Encodes memcache message to a raw byte array.
*
* @param msg Message being serialized.
+ * @param igniteCfg Ignite config.
* @return Serialized message.
* @throws IgniteCheckedException If serialization failed.
*/
- private ByteBuffer encodeMemcache(GridMemcachedMessage msg) throws IgniteCheckedException {
+ private ByteBuffer encodeMemcache(GridMemcachedMessage msg,
+ final IgniteConfiguration igniteCfg) throws IgniteCheckedException {
GridByteArrayList res = new GridByteArrayList(HDR_LEN);
int keyLen = 0;
@@ -546,7 +551,7 @@ public class GridTcpRestParser implements GridNioParser {
if (msg.key() != null) {
ByteArrayOutputStream rawKey = new ByteArrayOutputStream();
- keyFlags = encodeObj(msg.key(), rawKey);
+ keyFlags = encodeObj(msg.key(), rawKey, igniteCfg);
msg.key(rawKey.toByteArray());
@@ -560,7 +565,7 @@ public class GridTcpRestParser implements GridNioParser {
if (msg.value() != null) {
ByteArrayOutputStream rawVal = new ByteArrayOutputStream();
- valFlags = encodeObj(msg.value(), rawVal);
+ valFlags = encodeObj(msg.value(), rawVal, igniteCfg);
msg.value(rawVal.toByteArray());
@@ -645,7 +650,7 @@ public class GridTcpRestParser implements GridNioParser {
byte[] rawKey = (byte[])req.key();
// Only values can be hessian-encoded.
- req.key(decodeObj(keyFlags, rawKey));
+ req.key(decodeObj(keyFlags, rawKey, ses.igniteConfiguration()));
}
if (req.value() != null) {
@@ -653,7 +658,7 @@ public class GridTcpRestParser implements GridNioParser {
byte[] rawVal = (byte[])req.value();
- req.value(decodeObj(valFlags, rawVal));
+ req.value(decodeObj(valFlags, rawVal, ses.igniteConfiguration()));
}
}
@@ -711,14 +716,16 @@ public class GridTcpRestParser implements GridNioParser {
*
* @param flags Flags.
* @param bytes Byte array to decode.
+ * @param igniteCfg Ignite config.
* @return Decoded value.
* @throws IgniteCheckedException If deserialization failed.
*/
- private Object decodeObj(short flags, byte[] bytes) throws IgniteCheckedException {
+ private Object decodeObj(short flags, byte[] bytes,
+ final IgniteConfiguration igniteCfg) throws IgniteCheckedException {
assert bytes != null;
if ((flags & SERIALIZED_FLAG) != 0)
- return jdkMarshaller.unmarshal(bytes, null);
+ return MarshallerUtils.unmarshal(jdkMarshaller, bytes, null, igniteCfg);
int masked = flags & 0xff00;
@@ -749,10 +756,12 @@ public class GridTcpRestParser implements GridNioParser {
*
* @param obj Object to serialize.
* @param out Output stream to which object should be written.
+ * @param igniteCfg Ignite config.
* @return Serialization flags.
* @throws IgniteCheckedException If JDK serialization failed.
*/
- private int encodeObj(Object obj, ByteArrayOutputStream out) throws IgniteCheckedException {
+ private int encodeObj(Object obj, ByteArrayOutputStream out,
+ final IgniteConfiguration igniteCfg) throws IgniteCheckedException {
int flags = 0;
byte[] data = null;
@@ -800,7 +809,7 @@ public class GridTcpRestParser implements GridNioParser {
flags |= BYTE_ARR_FLAG;
}
else {
- jdkMarshaller.marshal(obj, out);
+ MarshallerUtils.marshal(jdkMarshaller, obj, out, igniteCfg.getGridName());
flags |= SERIALIZED_FLAG;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/97ce8fbb/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
index b418ba2..c17c9fc 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
@@ -83,6 +83,7 @@ import org.apache.ignite.lang.IgniteProductVersion;
import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.marshaller.Marshaller;
import org.apache.ignite.resources.IgniteInstanceResource;
+import org.apache.ignite.marshaller.MarshallerUtils;
import org.apache.ignite.resources.JobContextResource;
import org.apache.ignite.resources.LoggerResource;
import org.apache.ignite.services.Service;
@@ -452,7 +453,7 @@ public class GridServiceProcessor extends GridProcessorAdapter {
LazyServiceConfiguration cfg0;
try {
- byte[] srvcBytes = marsh.marshal(cfg.getService());
+ byte[] srvcBytes = MarshallerUtils.marshal(marsh, cfg.getService(), ctx.gridName());
cfg0 = new LazyServiceConfiguration(cfg, srvcBytes);
}
@@ -1123,7 +1124,7 @@ public class GridServiceProcessor extends GridProcessorAdapter {
if (cfg instanceof LazyServiceConfiguration) {
byte[] bytes = ((LazyServiceConfiguration)cfg).serviceBytes();
- Service srvc = m.unmarshal(bytes, U.resolveClassLoader(null, ctx.config()));
+ Service srvc = MarshallerUtils.unmarshal(m, bytes, U.resolveClassLoader(null, ctx.config()), ctx.gridName());
ctx.resource().inject(srvc);
@@ -1133,10 +1134,10 @@ public class GridServiceProcessor extends GridProcessorAdapter {
Service svc = cfg.getService();
try {
- byte[] bytes = m.marshal(svc);
+ byte[] bytes = MarshallerUtils.marshal(m, svc, ctx.gridName());
- Service cp = m.unmarshal(bytes,
- U.resolveClassLoader(svc.getClass().getClassLoader(), ctx.config()));
+ Service cp = MarshallerUtils.unmarshal(m, bytes, U.resolveClassLoader(svc.getClass().getClassLoader(),
+ ctx.config()), ctx.gridName());
ctx.resource().inject(cp);