You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2016/09/06 14:40:08 UTC
[46/50] [abbrv] ignite git commit: IGNITE-2649: Ensured correct local
Ignite instance processing during serialization and deserialization.
http://git-wip-us.apache.org/repos/asf/ignite/blob/e5f3abd2/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java
index 0c3cf0e..9633a65 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java
@@ -71,6 +71,7 @@ import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteFuture;
import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.marshaller.Marshaller;
+import org.apache.ignite.marshaller.MarshallerUtils;
import org.apache.ignite.plugin.security.SecurityPermission;
import org.jetbrains.annotations.Nullable;
import org.jsr166.LongAdder8;
@@ -913,7 +914,7 @@ public class GridTaskProcessor extends GridProcessorAdapter {
GridTaskSessionRequest req = new GridTaskSessionRequest(
ses.getId(),
null,
- loc ? null : marsh.marshal(attrs),
+ loc ? null : MarshallerUtils.marshal(ctx, attrs),
attrs);
// Make sure to go through IO manager always, since order
@@ -1029,7 +1030,7 @@ public class GridTaskProcessor extends GridProcessorAdapter {
boolean loc = ctx.localNodeId().equals(nodeId) && !ctx.config().isMarshalLocalJobs();
Map<?, ?> attrs = loc ? msg.getAttributes() :
- marsh.<Map<?, ?>>unmarshal(msg.getAttributesBytes(),
+ MarshallerUtils.<Map<?, ?>>unmarshal(ctx.gridName(), marsh, msg.getAttributesBytes(),
U.resolveClassLoader(task.getTask().getClass().getClassLoader(), ctx.config()));
GridTaskSessionImpl ses = task.getSession();
@@ -1305,7 +1306,8 @@ public class GridTaskProcessor extends GridProcessorAdapter {
if (topic == null) {
assert req.topicBytes() != null;
- topic = marsh.unmarshal(req.topicBytes(), U.resolveClassLoader(ctx.config()));
+ topic = MarshallerUtils.unmarshal(ctx.gridName(), marsh, req.topicBytes(),
+ U.resolveClassLoader(ctx.config()));
}
boolean loc = ctx.localNodeId().equals(nodeId);
@@ -1313,7 +1315,7 @@ public class GridTaskProcessor extends GridProcessorAdapter {
ctx.io().send(nodeId, topic,
new GridJobSiblingsResponse(
loc ? siblings : null,
- loc ? null : marsh.marshal(siblings)),
+ loc ? null : MarshallerUtils.marshal(ctx, siblings)),
SYSTEM_POOL);
}
catch (IgniteCheckedException e) {
http://git-wip-us.apache.org/repos/asf/ignite/blob/e5f3abd2/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java
index 8ce005a..10942da 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java
@@ -81,6 +81,7 @@ import org.apache.ignite.internal.visor.util.VisorClusterGroupEmptyException;
import org.apache.ignite.lang.IgniteInClosure;
import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.marshaller.Marshaller;
+import org.apache.ignite.marshaller.MarshallerUtils;
import org.apache.ignite.resources.TaskContinuousMapperResource;
import org.jetbrains.annotations.Nullable;
import org.jsr166.ConcurrentLinkedDeque8;
@@ -774,15 +775,15 @@ class GridTaskWorker<T, R> extends GridWorker implements GridTimeoutObject {
try {
boolean loc = ctx.localNodeId().equals(res.getNodeId()) && !ctx.config().isMarshalLocalJobs();
- Object res0 = loc ? res.getJobResult() : marsh.unmarshal(res.getJobResultBytes(),
- U.resolveClassLoader(clsLdr, ctx.config()));
+ Object res0 = loc ? res.getJobResult() : MarshallerUtils.unmarshal(ctx.gridName(), marsh,
+ res.getJobResultBytes(), U.resolveClassLoader(clsLdr, ctx.config()));
IgniteException ex = loc ? res.getException() :
- marsh.<IgniteException>unmarshal(res.getExceptionBytes(),
+ MarshallerUtils.<IgniteException>unmarshal(ctx.gridName(), marsh, res.getExceptionBytes(),
U.resolveClassLoader(clsLdr, ctx.config()));
Map<Object, Object> attrs = loc ? res.getJobAttributes() :
- marsh.<Map<Object, Object>>unmarshal(res.getJobAttributesBytes(),
+ MarshallerUtils.<Map<Object, Object>>unmarshal(ctx.gridName(), marsh, res.getJobAttributesBytes(),
U.resolveClassLoader(clsLdr, ctx.config()));
jobRes.onResponse(res0, ex, attrs, res.isCancelled());
@@ -1253,16 +1254,16 @@ class GridTaskWorker<T, R> extends GridWorker implements GridTimeoutObject {
ses.getTaskName(),
ses.getUserVersion(),
ses.getTaskClassName(),
- loc ? null : marsh.marshal(res.getJob()),
+ loc ? null : MarshallerUtils.marshal(ctx, res.getJob()),
loc ? res.getJob() : null,
ses.getStartTime(),
timeout,
ses.getTopology(),
- loc ? null : marsh.marshal(ses.getJobSiblings()),
+ loc ? null : MarshallerUtils.marshal(ctx, ses.getJobSiblings()),
loc ? ses.getJobSiblings() : null,
- loc ? null : marsh.marshal(sesAttrs),
+ loc ? null : MarshallerUtils.marshal(ctx, sesAttrs),
loc ? sesAttrs : null,
- loc ? null : marsh.marshal(jobAttrs),
+ loc ? null : MarshallerUtils.marshal(ctx, jobAttrs),
loc ? jobAttrs : null,
ses.getCheckpointSpi(),
dep.classLoaderId(),
http://git-wip-us.apache.org/repos/asf/ignite/blob/e5f3abd2/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
index 08c9219..b6f2081 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
@@ -250,7 +250,7 @@ import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_MACS;
/**
* Collection of utility methods used throughout the system.
*/
-@SuppressWarnings({"UnusedReturnValue", "UnnecessaryFullyQualifiedName"})
+@SuppressWarnings({"UnusedReturnValue", "UnnecessaryFullyQualifiedName", "RedundantStringConstructorCall"})
public abstract class IgniteUtils {
/** Unsafe. */
private static final Unsafe UNSAFE = GridUnsafe.unsafe();
@@ -489,6 +489,16 @@ public abstract class IgniteUtils {
/** Object.toString() */
private static Method toStringMtd;
+ /** Empty local Ignite name. */
+ private static final String LOC_IGNITE_NAME_EMPTY = new String();
+
+ /** Local Ignite name thread local. */
+ private static final ThreadLocal<String> LOC_IGNITE_NAME = new ThreadLocal<String>() {
+ @Override protected String initialValue() {
+ return LOC_IGNITE_NAME_EMPTY;
+ }
+ };
+
/**
* Initializes enterprise check.
*/
@@ -9555,4 +9565,47 @@ public abstract class IgniteUtils {
public static boolean isToStringMethod(Method mtd) {
return toStringMtd.equals(mtd);
}
+
+ /**
+ * Get current Ignite name.
+ *
+ * @return Current Ignite name.
+ */
+ @Nullable public static String getCurrentIgniteName() {
+ return LOC_IGNITE_NAME.get();
+ }
+
+ /**
+ * Check if current Ignite name is set.
+ *
+ * @param name Name to check.
+ * @return {@code True} if set.
+ */
+ @SuppressWarnings("StringEquality")
+ public static boolean isCurrentIgniteNameSet(@Nullable String name) {
+ return name != LOC_IGNITE_NAME_EMPTY;
+ }
+
+ /**
+ * Set current Ignite name.
+ *
+ * @param newName New name.
+ * @return Old name.
+ */
+ @Nullable public static String setCurrentIgniteName(@Nullable String newName) {
+ String oldName = LOC_IGNITE_NAME.get();
+
+ LOC_IGNITE_NAME.set(newName);
+
+ return oldName;
+ }
+
+ /**
+ * Restore current Ignite name.
+ *
+ * @param oldName Old name.
+ */
+ public static void restoreCurrentIgniteName(@Nullable String oldName) {
+ LOC_IGNITE_NAME.set(oldName);
+ }
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/e5f3abd2/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/IpcToNioAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/IpcToNioAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/IpcToNioAdapter.java
index 6820dc7..262140a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/IpcToNioAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/IpcToNioAdapter.java
@@ -73,10 +73,12 @@ public class IpcToNioAdapter<T> {
* @param endp Endpoint.
* @param lsnr Listener.
* @param writerFactory Writer factory.
+ * @param gridName Grid name.
* @param filters Filters.
*/
public IpcToNioAdapter(GridNioMetricsListener metricsLsnr, IgniteLogger log, IpcEndpoint endp,
- GridNioServerListener<T> lsnr, GridNioMessageWriterFactory writerFactory, GridNioFilter... filters) {
+ GridNioServerListener<T> lsnr, GridNioMessageWriterFactory writerFactory,
+ String gridName, GridNioFilter... filters) {
assert metricsLsnr != null;
this.metricsLsnr = metricsLsnr;
@@ -84,7 +86,7 @@ public class IpcToNioAdapter<T> {
this.writerFactory = writerFactory;
chain = new GridNioFilterChain<>(log, lsnr, new HeadFilter(), filters);
- ses = new GridNioSessionImpl(chain, null, null, true);
+ ses = new GridNioSessionImpl(chain, null, null, true, gridName);
writeBuf = ByteBuffer.allocate(8 << 10);
http://git-wip-us.apache.org/repos/asf/ignite/blob/e5f3abd2/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
index ac55a14..7470759 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
@@ -47,6 +47,7 @@ import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.configuration.ConnectorConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.util.GridConcurrentHashSet;
@@ -196,6 +197,9 @@ public class GridNioServer<T> {
}
}
+ /** Grid name */
+ private String gridName;
+
/**
* @param addr Address.
* @param port Port.
@@ -260,6 +264,7 @@ public class GridNioServer<T> {
this.sockSndBuf = sockSndBuf;
this.sndQueueLimit = sndQueueLimit;
this.msgQueueLsnr = msgQueueLsnr;
+ this.gridName = gridName;
filterChain = new GridNioFilterChain<>(log, lsnr, new HeadFilter(), filters);
@@ -1639,7 +1644,8 @@ public class GridNioServer<T> {
req.accepted(),
sndQueueLimit,
writeBuf,
- readBuf);
+ readBuf,
+ gridName);
Map<Integer, ?> meta = req.meta();
http://git-wip-us.apache.org/repos/asf/ignite/blob/e5f3abd2/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioSession.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioSession.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioSession.java
index e4a7225..74ac775 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioSession.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioSession.java
@@ -18,6 +18,7 @@
package org.apache.ignite.internal.util.nio;
import java.net.InetSocketAddress;
+
import org.jetbrains.annotations.Nullable;
/**
@@ -164,4 +165,9 @@ public interface GridNioSession {
* @return Recovery descriptor if recovery is supported, {@code null otherwise.}
*/
@Nullable public GridNioRecoveryDescriptor recoveryDescriptor();
+
+ /**
+ * @return Grid name.
+ */
+ public String gridName();
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/e5f3abd2/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioSessionImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioSessionImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioSessionImpl.java
index 0bcfe64..100165e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioSessionImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioSessionImpl.java
@@ -69,22 +69,28 @@ public class GridNioSessionImpl implements GridNioSession {
/** Accepted flag. */
private final boolean accepted;
+ /** Grid name. */
+ private String gridName;
+
/**
* @param filterChain Chain.
* @param locAddr Local address.
* @param rmtAddr Remote address.
+ * @param gridName Grid name.
* @param accepted {@code True} if this session was initiated from remote host.
*/
public GridNioSessionImpl(
GridNioFilterChain filterChain,
@Nullable InetSocketAddress locAddr,
@Nullable InetSocketAddress rmtAddr,
- boolean accepted
+ boolean accepted,
+ String gridName
) {
this.filterChain = filterChain;
this.locAddr = locAddr;
this.rmtAddr = rmtAddr;
this.accepted = accepted;
+ this.gridName = gridName;
long now = U.currentTimeMillis();
@@ -309,4 +315,9 @@ public class GridNioSessionImpl implements GridNioSession {
@Override public String toString() {
return S.toString(GridNioSessionImpl.class, this);
}
+
+ /** {@inheritDoc} */
+ @Override public String gridName() {
+ return gridName;
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/e5f3abd2/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java
index 360b3d4..7b2a8f7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java
@@ -77,6 +77,7 @@ class GridSelectorNioSessionImpl extends GridNioSessionImpl {
* @param sndQueueLimit Send queue limit.
* @param writeBuf Write buffer.
* @param readBuf Read buffer.
+ * @param gridName Grid name.
*/
GridSelectorNioSessionImpl(
IgniteLogger log,
@@ -87,9 +88,10 @@ class GridSelectorNioSessionImpl extends GridNioSessionImpl {
boolean accepted,
int sndQueueLimit,
@Nullable ByteBuffer writeBuf,
- @Nullable ByteBuffer readBuf
+ @Nullable ByteBuffer readBuf,
+ String gridName
) {
- super(filterChain, locAddr, rmtAddr, accepted);
+ super(filterChain, locAddr, rmtAddr, accepted, gridName);
assert selectorIdx >= 0;
assert sndQueueLimit >= 0;
http://git-wip-us.apache.org/repos/asf/ignite/blob/e5f3abd2/modules/core/src/main/java/org/apache/ignite/marshaller/AbstractMarshaller.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/marshaller/AbstractMarshaller.java b/modules/core/src/main/java/org/apache/ignite/marshaller/AbstractMarshaller.java
index dd5bad0..6c3428e 100644
--- a/modules/core/src/main/java/org/apache/ignite/marshaller/AbstractMarshaller.java
+++ b/modules/core/src/main/java/org/apache/ignite/marshaller/AbstractMarshaller.java
@@ -17,12 +17,9 @@
package org.apache.ignite.marshaller;
-import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.internal.util.GridByteArrayList;
import org.apache.ignite.internal.util.io.GridByteArrayInputStream;
import org.apache.ignite.internal.util.io.GridByteArrayOutputStream;
-import org.apache.ignite.internal.util.typedef.internal.U;
-import org.jetbrains.annotations.Nullable;
/**
* Base class for marshallers. Provides default implementations of methods
@@ -37,7 +34,6 @@ public abstract class AbstractMarshaller implements Marshaller {
/** Context. */
protected MarshallerContext ctx;
-
/**
* Undeployment callback invoked when class loader is being undeployed.
*
@@ -47,38 +43,15 @@ public abstract class AbstractMarshaller implements Marshaller {
*/
public abstract void onUndeploy(ClassLoader ldr);
- /** {@inheritDoc} */
- @Override public void setContext(MarshallerContext ctx) {
- this.ctx = ctx;
- }
-
- /** {@inheritDoc} */
- @Override public byte[] marshal(@Nullable Object obj) throws IgniteCheckedException {
- GridByteArrayOutputStream out = null;
-
- try {
- out = new GridByteArrayOutputStream(DFLT_BUFFER_SIZE);
-
- marshal(obj, out);
-
- return out.toByteArray();
- }
- finally {
- U.close(out, null);
- }
+ /**
+ * @return Marshaller context.
+ */
+ public MarshallerContext getContext() {
+ return ctx;
}
/** {@inheritDoc} */
- @Override public <T> T unmarshal(byte[] arr, @Nullable ClassLoader clsLdr) throws IgniteCheckedException {
- GridByteArrayInputStream in = null;
-
- try {
- in = new GridByteArrayInputStream(arr, 0, arr.length);
-
- return unmarshal(in, clsLdr);
- }
- finally {
- U.close(in, null);
- }
+ @Override public void setContext(MarshallerContext ctx) {
+ this.ctx = ctx;
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/e5f3abd2/modules/core/src/main/java/org/apache/ignite/marshaller/MarshallerUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/marshaller/MarshallerUtils.java b/modules/core/src/main/java/org/apache/ignite/marshaller/MarshallerUtils.java
new file mode 100644
index 0000000..0775622
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/marshaller/MarshallerUtils.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.marshaller;
+
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.util.IgniteUtils;
+import org.jetbrains.annotations.Nullable;
+
+import java.io.InputStream;
+import java.io.OutputStream;
+
+/**
+ * Utility marshaller methods.
+ */
+public class MarshallerUtils {
+ /**
+ * Marshal object with provided node name.
+ *
+ * @param name Grid name.
+ * @param marsh Marshaller.
+ * @param obj Object to marshal.
+ * @return Result.
+ * @throws IgniteCheckedException If failed.
+ */
+ public static byte[] marshal(String name, Marshaller marsh, @Nullable Object obj) throws IgniteCheckedException {
+ String oldName = IgniteUtils.setCurrentIgniteName(name);
+
+ try {
+ return marsh.marshal(obj);
+ }
+ finally {
+ IgniteUtils.restoreCurrentIgniteName(oldName);
+ }
+ }
+
+ /**
+ * Marshal object to stream and set grid name thread local.
+ *
+ * @param name Grid name.
+ * @param marshaller Marshaller.
+ * @param obj Object to marshal.
+ * @param out Output stream.
+ * @throws IgniteCheckedException If fail.
+ */
+ public static void marshal(String name, Marshaller marshaller, @Nullable Object obj, OutputStream out)
+ throws IgniteCheckedException {
+ String oldName = IgniteUtils.setCurrentIgniteName(name);
+
+ try {
+ marshaller.marshal(obj, out);
+ }
+ finally {
+ IgniteUtils.restoreCurrentIgniteName(oldName);
+ }
+ }
+
+ /**
+ * Marshal object with node name taken from provided kernal context.
+ *
+ * @param ctx Kernal context.
+ * @param obj Object to marshal.
+ * @return Result.
+ * @throws IgniteCheckedException If failed.
+ */
+ public static byte[] marshal(GridKernalContext ctx, @Nullable Object obj) throws IgniteCheckedException {
+ return marshal(ctx.gridName(), ctx.config().getMarshaller(), obj);
+ }
+
+ /**
+ * Unmarshal object and set grid name thread local.
+ *
+ * @param name Grid name.
+ * @param marsh Marshaller.
+ * @param arr Binary data.
+ * @param ldr Class loader.
+ * @return Deserialized object.
+ * @throws IgniteCheckedException If failed.
+ */
+ public static <T> T unmarshal(String name, Marshaller marsh, byte[] arr, @Nullable ClassLoader ldr)
+ throws IgniteCheckedException {
+ String oldName = IgniteUtils.setCurrentIgniteName(name);
+
+ try {
+ return marsh.unmarshal(arr, ldr);
+ }
+ finally {
+ IgniteUtils.restoreCurrentIgniteName(oldName);
+ }
+ }
+
+ /**
+ * Unmarshal object from stream and set grid name thread local.
+ *
+ * @param name Grid name.
+ * @param marsh Marshaller.
+ * @param in Input stream.
+ * @param ldr Class loader.
+ * @return Deserialized object.
+ * @throws IgniteCheckedException If failed.
+ */
+ public static <T> T unmarshal(String name, Marshaller marsh, InputStream in, @Nullable ClassLoader ldr)
+ throws IgniteCheckedException {
+ String oldName = IgniteUtils.setCurrentIgniteName(name);
+
+ try {
+ return marsh.unmarshal(in, ldr);
+ }
+ finally {
+ IgniteUtils.restoreCurrentIgniteName(oldName);
+ }
+ }
+
+ /**
+ * Marshal and unmarshal object.
+ *
+ * @param name Grid name.
+ * @param marsh Marshaller.
+ * @param obj Object to clone.
+ * @param clsLdr Class loader.
+ * @return Deserialized value.
+ * @throws IgniteCheckedException If failed.
+ */
+ public static <T> T marshalUnmarshal(String name, Marshaller marsh, T obj, @Nullable ClassLoader clsLdr)
+ throws IgniteCheckedException {
+ String oldName = IgniteUtils.setCurrentIgniteName(name);
+
+ try {
+ return marsh.unmarshal(marsh.marshal(obj), clsLdr);
+ }
+ finally {
+ IgniteUtils.restoreCurrentIgniteName(oldName);
+ }
+ }
+
+ /**
+ * Private constructor.
+ */
+ private MarshallerUtils() {
+ // No-op.
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/e5f3abd2/modules/core/src/main/java/org/apache/ignite/marshaller/jdk/JdkMarshaller.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/marshaller/jdk/JdkMarshaller.java b/modules/core/src/main/java/org/apache/ignite/marshaller/jdk/JdkMarshaller.java
index deb3953..c8bb383 100644
--- a/modules/core/src/main/java/org/apache/ignite/marshaller/jdk/JdkMarshaller.java
+++ b/modules/core/src/main/java/org/apache/ignite/marshaller/jdk/JdkMarshaller.java
@@ -23,6 +23,8 @@ import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.OutputStream;
import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.util.io.GridByteArrayInputStream;
+import org.apache.ignite.internal.util.io.GridByteArrayOutputStream;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.marshaller.AbstractMarshaller;
@@ -87,6 +89,22 @@ public class JdkMarshaller extends AbstractMarshaller {
}
/** {@inheritDoc} */
+ @Override public byte[] marshal(@Nullable Object obj) throws IgniteCheckedException {
+ GridByteArrayOutputStream out = null;
+
+ try {
+ out = new GridByteArrayOutputStream(DFLT_BUFFER_SIZE);
+
+ marshal(obj, out);
+
+ return out.toByteArray();
+ }
+ finally {
+ U.close(out, null);
+ }
+ }
+
+ /** {@inheritDoc} */
@SuppressWarnings({"unchecked"})
@Override public <T> T unmarshal(InputStream in, @Nullable ClassLoader clsLdr) throws IgniteCheckedException {
assert in != null;
@@ -115,6 +133,20 @@ public class JdkMarshaller extends AbstractMarshaller {
}
/** {@inheritDoc} */
+ @Override public <T> T unmarshal(byte[] arr, @Nullable ClassLoader clsLdr) throws IgniteCheckedException {
+ GridByteArrayInputStream in = null;
+
+ try {
+ in = new GridByteArrayInputStream(arr, 0, arr.length);
+
+ return unmarshal(in, clsLdr);
+ }
+ finally {
+ U.close(in, null);
+ }
+ }
+
+ /** {@inheritDoc} */
@Override public void onUndeploy(ClassLoader ldr) {
// No-op.
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/e5f3abd2/modules/core/src/main/java/org/apache/ignite/spi/checkpoint/sharedfs/SharedFsCheckpointSpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/checkpoint/sharedfs/SharedFsCheckpointSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/checkpoint/sharedfs/SharedFsCheckpointSpi.java
index 36a4ea6..7a12582 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/checkpoint/sharedfs/SharedFsCheckpointSpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/checkpoint/sharedfs/SharedFsCheckpointSpi.java
@@ -323,7 +323,7 @@ public class SharedFsCheckpointSpi extends IgniteSpiAdapter implements Checkpoin
log.debug("Checking checkpoint file: " + file.getAbsolutePath());
try {
- SharedFsCheckpointData data = SharedFsUtils.read(file, marsh, log);
+ SharedFsCheckpointData data = SharedFsUtils.read(file, marsh, log, ignite.configuration());
if (data.getHost().equals(host)) {
files.put(file, new SharedFsTimeData(data.getExpireTime(), file.lastModified(),
@@ -386,7 +386,7 @@ public class SharedFsCheckpointSpi extends IgniteSpiAdapter implements Checkpoin
if (file.exists())
try {
- SharedFsCheckpointData data = SharedFsUtils.read(file, marsh, log);
+ SharedFsCheckpointData data = SharedFsUtils.read(file, marsh, log, ignite.configuration());
return data != null ?
data.getExpireTime() == 0 || data.getExpireTime() > U.currentTimeMillis() ?
@@ -434,7 +434,7 @@ public class SharedFsCheckpointSpi extends IgniteSpiAdapter implements Checkpoin
try {
SharedFsUtils.write(file, new SharedFsCheckpointData(state, expireTime, host, key),
- marsh, log);
+ marsh, log, ignite.configuration());
}
catch (IOException e) {
// Select next shared directory if exists, otherwise throw exception
http://git-wip-us.apache.org/repos/asf/ignite/blob/e5f3abd2/modules/core/src/main/java/org/apache/ignite/spi/checkpoint/sharedfs/SharedFsTimeoutTask.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/checkpoint/sharedfs/SharedFsTimeoutTask.java b/modules/core/src/main/java/org/apache/ignite/spi/checkpoint/sharedfs/SharedFsTimeoutTask.java
index 3cf11f8..07619c0 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/checkpoint/sharedfs/SharedFsTimeoutTask.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/checkpoint/sharedfs/SharedFsTimeoutTask.java
@@ -25,6 +25,7 @@ import java.util.HashSet;
import java.util.Map;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.marshaller.Marshaller;
@@ -116,8 +117,13 @@ class SharedFsTimeoutTask extends IgniteSpiThread {
SharedFsTimeData timeData = entry.getValue();
try {
- if (timeData.getLastAccessTime() != file.lastModified())
- timeData.setExpireTime(SharedFsUtils.read(file, marshaller, log).getExpireTime());
+ if (timeData.getLastAccessTime() != file.lastModified()) {
+ final IgniteConfiguration igniteCfg = new IgniteConfiguration();
+
+ igniteCfg.setGridName(getGridName());
+
+ timeData.setExpireTime(SharedFsUtils.read(file, marshaller, log, igniteCfg).getExpireTime());
+ }
}
catch (IgniteCheckedException e) {
U.error(log, "Failed to marshal/unmarshal in checkpoint file: " + file.getAbsolutePath(), e);
http://git-wip-us.apache.org/repos/asf/ignite/blob/e5f3abd2/modules/core/src/main/java/org/apache/ignite/spi/checkpoint/sharedfs/SharedFsUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/checkpoint/sharedfs/SharedFsUtils.java b/modules/core/src/main/java/org/apache/ignite/spi/checkpoint/sharedfs/SharedFsUtils.java
index b1698fa..29a6a36 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/checkpoint/sharedfs/SharedFsUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/checkpoint/sharedfs/SharedFsUtils.java
@@ -25,8 +25,10 @@ import java.io.InputStream;
import java.io.OutputStream;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.marshaller.Marshaller;
+import org.apache.ignite.marshaller.MarshallerUtils;
/**
* Utility class that helps to manage files. It provides read/write
@@ -52,7 +54,7 @@ final class SharedFsUtils {
* to {@link SharedFsCheckpointData} object.
* @throws IOException Thrown if file read error occurred.
*/
- static SharedFsCheckpointData read(File file, Marshaller m, IgniteLogger log)
+ static SharedFsCheckpointData read(File file, Marshaller m, IgniteLogger log, final IgniteConfiguration igniteCfg)
throws IOException, IgniteCheckedException {
assert file != null;
assert m != null;
@@ -61,7 +63,7 @@ final class SharedFsUtils {
InputStream in = new FileInputStream(file);
try {
- return (SharedFsCheckpointData)m.unmarshal(in, U.gridClassLoader());
+ return (SharedFsCheckpointData) MarshallerUtils.unmarshal(igniteCfg.getGridName(), m, in, U.gridClassLoader());
}
finally {
U.close(in, log);
@@ -76,11 +78,12 @@ final class SharedFsUtils {
* @param data Checkpoint data.
* @param m Grid marshaller.
* @param log Messages logger.
+ * @param igniteCfg Ignite config.
* @throws IgniteCheckedException Thrown if data could not be marshalled.
* @throws IOException Thrown if file write operation failed.
*/
- static void write(File file, SharedFsCheckpointData data, Marshaller m, IgniteLogger log)
- throws IOException, IgniteCheckedException {
+ static void write(File file, SharedFsCheckpointData data, Marshaller m, IgniteLogger log,
+ final IgniteConfiguration igniteCfg) throws IOException, IgniteCheckedException {
assert file != null;
assert m != null;
assert data != null;
@@ -91,7 +94,7 @@ final class SharedFsUtils {
try {
out = new FileOutputStream(file);
- m.marshal(data, out);
+ MarshallerUtils.marshal(igniteCfg.getGridName(), m, data, out);
}
finally {
U.close(out, log);
http://git-wip-us.apache.org/repos/asf/ignite/blob/e5f3abd2/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
index ed29b59..5938db6 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
@@ -3013,6 +3013,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
endpoint,
srvLsnr,
writerFactory,
+ gridName,
new GridNioCodecFilter(new GridDirectParser(msgFactory, readerFactory), log, true),
new GridConnectionBytesVerifyFilter(log)
);
http://git-wip-us.apache.org/repos/asf/ignite/blob/e5f3abd2/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
index 79e58b1..78a1911 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
@@ -64,6 +64,7 @@ import org.apache.ignite.internal.util.typedef.internal.LT;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteInClosure;
import org.apache.ignite.lang.IgniteUuid;
+import org.apache.ignite.marshaller.MarshallerUtils;
import org.apache.ignite.spi.IgniteSpiContext;
import org.apache.ignite.spi.IgniteSpiException;
import org.apache.ignite.spi.IgniteSpiOperationTimeoutHelper;
@@ -429,8 +430,7 @@ class ClientImpl extends TcpDiscoveryImpl {
throw new IgniteClientDisconnectedException(null, "Failed to send custom message: client is disconnected.");
try {
- sockWriter.sendMessage(new TcpDiscoveryCustomEventMessage(getLocalNodeId(), evt,
- spi.marsh.marshal(evt)));
+ sockWriter.sendMessage(new TcpDiscoveryCustomEventMessage(getLocalNodeId(), evt, marshal(evt)));
}
catch (IgniteCheckedException e) {
throw new IgniteSpiException("Failed to marshal custom event: " + evt, e);
@@ -683,8 +683,10 @@ class ClientImpl extends TcpDiscoveryImpl {
// Use security-unsafe getter.
Map<String, Object> attrs = new HashMap<>(node.getAttributes());
- attrs.put(IgniteNodeAttributes.ATTR_SECURITY_CREDENTIALS,
- spi.marsh.marshal(attrs.get(IgniteNodeAttributes.ATTR_SECURITY_CREDENTIALS)));
+ attrs.put(
+ IgniteNodeAttributes.ATTR_SECURITY_CREDENTIALS,
+ marshal(attrs.get(IgniteNodeAttributes.ATTR_SECURITY_CREDENTIALS))
+ );
node.setAttributes(attrs);
}
@@ -885,7 +887,7 @@ class ClientImpl extends TcpDiscoveryImpl {
TcpDiscoveryAbstractMessage msg;
try {
- msg = spi.marsh.unmarshal(in, U.resolveClassLoader(spi.ignite().configuration()));
+ msg = unmarshal(in);
}
catch (IgniteCheckedException e) {
if (log.isDebugEnabled())
@@ -1210,8 +1212,7 @@ class ClientImpl extends TcpDiscoveryImpl {
List<TcpDiscoveryAbstractMessage> msgs = null;
while (!isInterrupted()) {
- TcpDiscoveryAbstractMessage msg = spi.marsh.unmarshal(in,
- U.resolveClassLoader(spi.ignite().configuration()));
+ TcpDiscoveryAbstractMessage msg = unmarshal(in);
if (msg instanceof TcpDiscoveryClientReconnectMessage) {
TcpDiscoveryClientReconnectMessage res = (TcpDiscoveryClientReconnectMessage)msg;
@@ -1965,7 +1966,7 @@ class ClientImpl extends TcpDiscoveryImpl {
if (node != null && node.visible()) {
try {
DiscoverySpiCustomMessage msgObj = msg.message(spi.marsh,
- U.resolveClassLoader(spi.ignite().configuration()));
+ U.resolveClassLoader(spi.ignite().configuration()), spi.ignite().name());
notifyDiscovery(EVT_DISCOVERY_CUSTOM_EVT, topVer, node, allVisibleNodes(), msgObj);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/e5f3abd2/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
index 7f689c5..9211722 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
@@ -65,6 +65,7 @@ import org.apache.ignite.IgniteLogger;
import org.apache.ignite.cache.CacheMetrics;
import org.apache.ignite.cluster.ClusterMetrics;
import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.IgniteFutureTimeoutCheckedException;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.IgniteNodeAttributes;
@@ -90,6 +91,7 @@ import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.lang.IgniteInClosure;
import org.apache.ignite.lang.IgniteProductVersion;
import org.apache.ignite.lang.IgniteUuid;
+import org.apache.ignite.marshaller.MarshallerUtils;
import org.apache.ignite.plugin.security.SecurityCredentials;
import org.apache.ignite.plugin.security.SecurityPermissionSet;
import org.apache.ignite.spi.IgniteNodeValidationResult;
@@ -740,7 +742,7 @@ class ServerImpl extends TcpDiscoveryImpl {
/** {@inheritDoc} */
@Override public void sendCustomEvent(DiscoverySpiCustomMessage evt) {
try {
- msgWorker.addMessage(new TcpDiscoveryCustomEventMessage(getLocalNodeId(), evt, spi.marsh.marshal(evt)));
+ msgWorker.addMessage(new TcpDiscoveryCustomEventMessage(getLocalNodeId(), evt, marshal(evt)));
}
catch (IgniteCheckedException e) {
throw new IgniteSpiException("Failed to marshal custom event: " + evt, e);
@@ -823,7 +825,7 @@ class ServerImpl extends TcpDiscoveryImpl {
Map<String, Object> attrs = new HashMap<>(locNode.attributes());
- attrs.put(IgniteNodeAttributes.ATTR_SECURITY_SUBJECT, spi.marsh.marshal(subj));
+ attrs.put(IgniteNodeAttributes.ATTR_SECURITY_SUBJECT, marshal(subj));
attrs.remove(IgniteNodeAttributes.ATTR_SECURITY_CREDENTIALS);
locNode.setAttributes(attrs);
@@ -1238,8 +1240,10 @@ class ServerImpl extends TcpDiscoveryImpl {
// Use security-unsafe getter.
Map<String, Object> attrs = new HashMap<>(node.getAttributes());
- attrs.put(IgniteNodeAttributes.ATTR_SECURITY_CREDENTIALS,
- spi.marsh.marshal(attrs.get(IgniteNodeAttributes.ATTR_SECURITY_CREDENTIALS)));
+ attrs.put(
+ IgniteNodeAttributes.ATTR_SECURITY_CREDENTIALS,
+ marshal(attrs.get(IgniteNodeAttributes.ATTR_SECURITY_CREDENTIALS))
+ );
node.setAttributes(attrs);
}
@@ -1262,7 +1266,7 @@ class ServerImpl extends TcpDiscoveryImpl {
if (credBytes == null)
return null;
- return spi.marsh.unmarshal(credBytes, null);
+ return MarshallerUtils.unmarshal(spi.ignite().name(), spi.marsh, credBytes, null);
}
catch (IgniteCheckedException e) {
throw new IgniteSpiException("Failed to unmarshal node security credentials: " + node.id(), e);
@@ -2356,7 +2360,7 @@ class ServerImpl extends TcpDiscoveryImpl {
for (ClientMessageWorker clientMsgWorker : clientMsgWorkers.values()) {
if (msgBytes == null) {
try {
- msgBytes = spi.marsh.marshal(msg);
+ msgBytes = marshal(msg);
}
catch (IgniteCheckedException e) {
U.error(log, "Failed to marshal message: " + msg, e);
@@ -2375,7 +2379,7 @@ class ServerImpl extends TcpDiscoveryImpl {
if (clientMsgWorker.clientNodeId.equals(node.id())) {
try {
- msg0 = spi.marsh.unmarshal(msgBytes,
+ msg0 = MarshallerUtils.unmarshal(spi.ignite().name(), spi.marsh, msgBytes,
U.resolveClassLoader(spi.ignite().configuration()));
prepareNodeAddedMessage(msg0, clientMsgWorker.clientNodeId, null, null, null);
@@ -3134,7 +3138,7 @@ class ServerImpl extends TcpDiscoveryImpl {
// Stick in authentication subject to node (use security-safe attributes for copy).
Map<String, Object> attrs = new HashMap<>(node.getAttributes());
- attrs.put(IgniteNodeAttributes.ATTR_SECURITY_SUBJECT, spi.marsh.marshal(subj));
+ attrs.put(IgniteNodeAttributes.ATTR_SECURITY_SUBJECT, marshal(subj));
node.setAttributes(attrs);
}
@@ -3785,9 +3789,11 @@ class ServerImpl extends TcpDiscoveryImpl {
else {
SecurityContext subj = spi.nodeAuth.authenticateNode(node, cred);
- SecurityContext coordSubj = spi.marsh.unmarshal(
+ final IgniteConfiguration cfg = spi.ignite().configuration();
+
+ SecurityContext coordSubj = MarshallerUtils.unmarshal(cfg.getGridName(), spi.marsh,
node.<byte[]>attribute(IgniteNodeAttributes.ATTR_SECURITY_SUBJECT),
- U.resolveClassLoader(spi.ignite().configuration()));
+ U.resolveClassLoader(cfg));
if (!permissionsEqual(coordSubj.subject().permissions(), subj.subject().permissions())) {
// Node has not pass authentication.
@@ -4838,7 +4844,8 @@ class ServerImpl extends TcpDiscoveryImpl {
DiscoverySpiCustomMessage msgObj = null;
try {
- msgObj = msg.message(spi.marsh, U.resolveClassLoader(spi.ignite().configuration()));
+ msgObj = msg.message(spi.marsh, U.resolveClassLoader(spi.ignite().configuration()),
+ spi.ignite().name());
}
catch (Throwable e) {
U.error(log, "Failed to unmarshal discovery custom message.", e);
@@ -4849,8 +4856,8 @@ class ServerImpl extends TcpDiscoveryImpl {
if (nextMsg != null) {
try {
- TcpDiscoveryCustomEventMessage ackMsg = new TcpDiscoveryCustomEventMessage(
- getLocalNodeId(), nextMsg, spi.marsh.marshal(nextMsg));
+ TcpDiscoveryCustomEventMessage ackMsg =
+ new TcpDiscoveryCustomEventMessage(getLocalNodeId(), nextMsg, marshal(nextMsg));
ackMsg.topologyVersion(msg.topologyVersion());
@@ -4981,8 +4988,10 @@ class ServerImpl extends TcpDiscoveryImpl {
if (node != null) {
try {
+ final IgniteConfiguration cfg = spi.ignite().configuration();
+
DiscoverySpiCustomMessage msgObj = msg.message(spi.marsh,
- U.resolveClassLoader(spi.ignite().configuration()));
+ U.resolveClassLoader(cfg), cfg.getGridName());
lsnr.onDiscovery(DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT,
msg.topologyVersion(),
@@ -4992,7 +5001,7 @@ class ServerImpl extends TcpDiscoveryImpl {
msgObj);
if (msgObj.isMutable())
- msg.message(msgObj, spi.marsh.marshal(msgObj));
+ msg.message(msgObj, marshal(msgObj));
}
catch (Throwable e) {
U.error(log, "Failed to unmarshal discovery custom message.", e);
@@ -5428,8 +5437,7 @@ class ServerImpl extends TcpDiscoveryImpl {
while (!isInterrupted()) {
try {
- TcpDiscoveryAbstractMessage msg = spi.marsh.unmarshal(in,
- U.resolveClassLoader(spi.ignite().configuration()));
+ TcpDiscoveryAbstractMessage msg = unmarshal(in);
msg.senderNodeId(nodeId);
@@ -5919,7 +5927,7 @@ class ServerImpl extends TcpDiscoveryImpl {
byte[] msgBytes = msgT.get2();
if (msgBytes == null)
- msgBytes = spi.marsh.marshal(msg);
+ msgBytes = marshal(msg);
if (msg instanceof TcpDiscoveryClientAckResponse) {
if (clientVer == null) {
http://git-wip-us.apache.org/repos/asf/ignite/blob/e5f3abd2/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java
index 30b83e5..d049314 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java
@@ -17,6 +17,7 @@
package org.apache.ignite.spi.discovery.tcp;
+import java.io.InputStream;
import java.net.InetSocketAddress;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
@@ -26,12 +27,15 @@ import java.util.Date;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.ConcurrentLinkedDeque;
+
+import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.util.typedef.internal.LT;
import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.marshaller.MarshallerUtils;
import org.apache.ignite.spi.IgniteSpiContext;
import org.apache.ignite.spi.IgniteSpiException;
import org.apache.ignite.spi.IgniteSpiThread;
@@ -326,4 +330,27 @@ abstract class TcpDiscoveryImpl {
return res;
}
+
+ /**
+ * Marshal object.
+ *
+ * @param obj Object.
+ * @return Result.
+ * @throws IgniteCheckedException If failed.
+ */
+ protected byte[] marshal(Object obj) throws IgniteCheckedException {
+ return MarshallerUtils.marshal(spi.ignite().name(), spi.marsh, obj);
+ }
+
+ /**
+ * Unmarshal object.
+ *
+ * @param in Input stream.
+ * @return Result.
+ * @throws IgniteCheckedException If failed.
+ */
+ protected <T> T unmarshal(InputStream in) throws IgniteCheckedException {
+ return MarshallerUtils.unmarshal(spi.ignite().configuration().getGridName(), spi.marsh, in,
+ U.resolveClassLoader(spi.ignite().configuration()));
+ }
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/e5f3abd2/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
index 308830e..1fb5482 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
@@ -64,6 +64,7 @@ import org.apache.ignite.lang.IgniteInClosure;
import org.apache.ignite.lang.IgniteProductVersion;
import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.marshaller.Marshaller;
+import org.apache.ignite.marshaller.MarshallerUtils;
import org.apache.ignite.marshaller.jdk.JdkMarshaller;
import org.apache.ignite.resources.IgniteInstanceResource;
import org.apache.ignite.resources.LoggerResource;
@@ -1377,7 +1378,7 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T
IgniteCheckedException err = null;
try {
- marsh.marshal(msg, out);
+ MarshallerUtils.marshal(ignite.name(), marsh, msg, out);
}
catch (IgniteCheckedException e) {
err = e;
@@ -1461,10 +1462,8 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T
try {
sock.setSoTimeout((int)timeout);
- T res = marsh.unmarshal(in == null ? sock.getInputStream() : in,
- U.resolveClassLoader(ignite.configuration()));
-
- return res;
+ return MarshallerUtils.unmarshal(ignite.configuration().getGridName(), marsh,
+ in == null ? sock.getInputStream() : in, U.resolveClassLoader(ignite.configuration()));
}
catch (IOException | IgniteCheckedException e) {
if (X.hasCause(e, SocketTimeoutException.class))
@@ -1679,7 +1678,7 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T
for (Map.Entry<Integer, Serializable> entry : data.entrySet()) {
try {
- byte[] bytes = marsh.marshal(entry.getValue());
+ byte[] bytes = MarshallerUtils.marshal(ignite.name(), marsh, entry.getValue());
data0.put(entry.getKey(), bytes);
}
@@ -1709,7 +1708,8 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T
for (Map.Entry<Integer, byte[]> entry : data.entrySet()) {
try {
- Serializable compData = marsh.unmarshal(entry.getValue(), clsLdr);
+ Serializable compData = MarshallerUtils.unmarshal(ignite.name(), marsh, entry.getValue(), clsLdr
+ );
data0.put(entry.getKey(), compData);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/e5f3abd2/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/multicast/TcpDiscoveryMulticastIpFinder.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/multicast/TcpDiscoveryMulticastIpFinder.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/multicast/TcpDiscoveryMulticastIpFinder.java
index 8b29efd..ba19ade 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/multicast/TcpDiscoveryMulticastIpFinder.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/multicast/TcpDiscoveryMulticastIpFinder.java
@@ -40,6 +40,7 @@ import org.apache.ignite.internal.util.typedef.internal.LT;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.marshaller.Marshaller;
+import org.apache.ignite.marshaller.MarshallerUtils;
import org.apache.ignite.marshaller.jdk.JdkMarshaller;
import org.apache.ignite.resources.LoggerResource;
import org.apache.ignite.spi.IgniteSpiConfiguration;
@@ -593,7 +594,7 @@ public class TcpDiscoveryMulticastIpFinder extends TcpDiscoveryVmIpFinder {
AddressResponse addrRes;
try {
- addrRes = new AddressResponse(data);
+ addrRes = new AddressResponse(data, getGridName());
}
catch (IgniteCheckedException e) {
LT.warn(log, e, "Failed to deserialize multicast response.");
@@ -640,6 +641,15 @@ public class TcpDiscoveryMulticastIpFinder extends TcpDiscoveryVmIpFinder {
}
}
+ /**
+ * get Ignite configuration if possible.
+ *
+ * @return Ignite config or {@code null}.
+ */
+ @Nullable private String getGridName() {
+ return ignite == null ? null : ignite.name();
+ }
+
/** {@inheritDoc} */
@Override public void close() {
if (addrSnds != null) {
@@ -688,14 +698,16 @@ public class TcpDiscoveryMulticastIpFinder extends TcpDiscoveryVmIpFinder {
* @param addrs Addresses discovery SPI binds to.
* @throws IgniteCheckedException If marshalling failed.
*/
- private AddressResponse(Collection<InetSocketAddress> addrs) throws IgniteCheckedException {
+ private AddressResponse(Collection<InetSocketAddress> addrs, final String gridName) throws IgniteCheckedException {
this.addrs = addrs;
- byte[] addrsData = marsh.marshal(addrs);
+ byte[] addrsData = MarshallerUtils.marshal(gridName, marsh, addrs);
+
data = new byte[U.IGNITE_HEADER.length + addrsData.length];
if (data.length > MAX_DATA_LENGTH)
- throw new IgniteCheckedException("Too long data packet [size=" + data.length + ", max=" + MAX_DATA_LENGTH + "]");
+ throw new IgniteCheckedException("Too long data packet [size=" + data.length +
+ ", max=" + MAX_DATA_LENGTH + "]");
System.arraycopy(U.IGNITE_HEADER, 0, data, 0, U.IGNITE_HEADER.length);
System.arraycopy(addrsData, 0, data, 4, addrsData.length);
@@ -703,14 +715,16 @@ public class TcpDiscoveryMulticastIpFinder extends TcpDiscoveryVmIpFinder {
/**
* @param data Message data.
+ * @param gridName Grid name.
* @throws IgniteCheckedException If unmarshalling failed.
*/
- private AddressResponse(byte[] data) throws IgniteCheckedException {
+ private AddressResponse(byte[] data, final String gridName) throws IgniteCheckedException {
assert U.bytesEqual(U.IGNITE_HEADER, 0, data, 0, U.IGNITE_HEADER.length);
this.data = data;
- addrs = marsh.unmarshal(Arrays.copyOfRange(data, U.IGNITE_HEADER.length, data.length), null);
+ addrs = MarshallerUtils.unmarshal(gridName, marsh,
+ Arrays.copyOfRange(data, U.IGNITE_HEADER.length, data.length), null);
}
/**
@@ -829,7 +843,7 @@ public class TcpDiscoveryMulticastIpFinder extends TcpDiscoveryVmIpFinder {
AddressResponse res;
try {
- res = new AddressResponse(addrs);
+ res = new AddressResponse(addrs, gridName);
}
catch (IgniteCheckedException e) {
U.error(log, "Failed to prepare multicast message.", e);
http://git-wip-us.apache.org/repos/asf/ignite/blob/e5f3abd2/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryCustomEventMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryCustomEventMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryCustomEventMessage.java
index ca5dd56..897a4ed 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryCustomEventMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryCustomEventMessage.java
@@ -18,9 +18,11 @@
package org.apache.ignite.spi.discovery.tcp.messages;
import java.util.UUID;
+
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.marshaller.Marshaller;
+import org.apache.ignite.marshaller.MarshallerUtils;
import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
@@ -70,20 +72,26 @@ public class TcpDiscoveryCustomEventMessage extends TcpDiscoveryAbstractMessage
}
/**
- * @return Deserialized message,
+ * @param marsh Marshaller.
+ * @param gridName Grid name.
+ * @return Deserialized message.
* @throws java.lang.Throwable if unmarshal failed.
*/
- @Nullable public DiscoverySpiCustomMessage message(@NotNull Marshaller marsh) throws Throwable {
- return message(marsh, null);
+ @Nullable public DiscoverySpiCustomMessage message(@NotNull Marshaller marsh, String gridName) throws Throwable {
+ return message(marsh, null, gridName);
}
/**
- * @return Deserialized message,
+ * @param marsh Marshaller.
+ * @param ldr Class loader.
+ * @param gridName Grid name.
+ * @return Deserialized message.
* @throws java.lang.Throwable if unmarshal failed.
*/
- @Nullable public DiscoverySpiCustomMessage message(@NotNull Marshaller marsh, ClassLoader ldr) throws Throwable {
+ @Nullable public DiscoverySpiCustomMessage message(@NotNull Marshaller marsh, ClassLoader ldr,
+ final String gridName) throws Throwable {
if (msg == null) {
- msg = marsh.unmarshal(msgBytes, ldr);
+ msg = MarshallerUtils.unmarshal(gridName, marsh, msgBytes, ldr);
assert msg != null;
}
@@ -104,4 +112,4 @@ public class TcpDiscoveryCustomEventMessage extends TcpDiscoveryAbstractMessage
@Override public String toString() {
return S.toString(TcpDiscoveryCustomEventMessage.class, this, "super", super.toString());
}
-}
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/e5f3abd2/modules/core/src/main/java/org/apache/ignite/spi/swapspace/file/FileSwapSpaceSpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/swapspace/file/FileSwapSpaceSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/swapspace/file/FileSwapSpaceSpi.java
index df35ed3..052a422 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/swapspace/file/FileSwapSpaceSpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/swapspace/file/FileSwapSpaceSpi.java
@@ -57,6 +57,7 @@ import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteBiInClosure;
import org.apache.ignite.lang.IgniteInClosure;
+import org.apache.ignite.marshaller.MarshallerUtils;
import org.apache.ignite.resources.LoggerResource;
import org.apache.ignite.spi.IgniteSpiAdapter;
import org.apache.ignite.spi.IgniteSpiCloseableIterator;
@@ -594,7 +595,7 @@ public class FileSwapSpaceSpi extends IgniteSpiAdapter implements SwapSpaceSpi,
if (keyBytes == null) {
try {
- keyBytes = ignite.configuration().getMarshaller().marshal(key.key());
+ keyBytes = MarshallerUtils.marshal(ignite.name(), ignite.configuration().getMarshaller(), key.key());
}
catch (IgniteCheckedException e) {
throw new IgniteSpiException("Failed to marshal key: " + key.key(), e);
http://git-wip-us.apache.org/repos/asf/ignite/blob/e5f3abd2/modules/core/src/main/java/org/apache/ignite/stream/socket/SocketMessageConverter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/stream/socket/SocketMessageConverter.java b/modules/core/src/main/java/org/apache/ignite/stream/socket/SocketMessageConverter.java
index 03e82e3..9e2ca67 100644
--- a/modules/core/src/main/java/org/apache/ignite/stream/socket/SocketMessageConverter.java
+++ b/modules/core/src/main/java/org/apache/ignite/stream/socket/SocketMessageConverter.java
@@ -25,7 +25,8 @@ public interface SocketMessageConverter<T> {
* Converter message represented by array of bytes to object.
*
* @param msg Message.
+ * @param gridName Grid name.
* @return Converted object.
*/
- public T convert(byte[] msg);
+ public T convert(byte[] msg, String gridName);
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/e5f3abd2/modules/core/src/main/java/org/apache/ignite/stream/socket/SocketStreamer.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/stream/socket/SocketStreamer.java b/modules/core/src/main/java/org/apache/ignite/stream/socket/SocketStreamer.java
index 066a5fd..818ae6a 100644
--- a/modules/core/src/main/java/org/apache/ignite/stream/socket/SocketStreamer.java
+++ b/modules/core/src/main/java/org/apache/ignite/stream/socket/SocketStreamer.java
@@ -35,6 +35,7 @@ import org.apache.ignite.internal.util.nio.GridNioServerListenerAdapter;
import org.apache.ignite.internal.util.nio.GridNioSession;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.A;
+import org.apache.ignite.marshaller.MarshallerUtils;
import org.apache.ignite.marshaller.jdk.JdkMarshaller;
import org.apache.ignite.stream.StreamAdapter;
import org.apache.ignite.stream.StreamTupleExtractor;
@@ -163,7 +164,7 @@ public class SocketStreamer<T, K, V> extends StreamAdapter<T, K, V> {
}
@Override public void onMessage(GridNioSession ses, byte[] msg) {
- addMessage(converter.convert(msg));
+ addMessage(converter.convert(msg, ses.gridName()));
}
};
@@ -218,9 +219,9 @@ public class SocketStreamer<T, K, V> extends StreamAdapter<T, K, V> {
private static final JdkMarshaller MARSH = new JdkMarshaller();
/** {@inheritDoc} */
- @Override public T convert(byte[] msg) {
+ @Override public T convert(byte[] msg, final String gridName) {
try {
- return MARSH.unmarshal(msg, null);
+ return MarshallerUtils.unmarshal(gridName, MARSH, msg, null);
}
catch (IgniteCheckedException e) {
throw new IgniteException(e);
http://git-wip-us.apache.org/repos/asf/ignite/blob/e5f3abd2/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridLocalIgniteSerializationTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridLocalIgniteSerializationTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridLocalIgniteSerializationTest.java
new file mode 100644
index 0000000..8531f92
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridLocalIgniteSerializationTest.java
@@ -0,0 +1,373 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.Ignition;
+import org.apache.ignite.binary.BinaryObjectException;
+import org.apache.ignite.binary.BinaryReader;
+import org.apache.ignite.binary.BinaryWriter;
+import org.apache.ignite.binary.Binarylizable;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.binary.BinaryMarshaller;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutput;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+import java.util.concurrent.Callable;
+
+/**
+ * Test for local Ignite instance processing during serialization/deserialization.
+ */
+public class GridLocalIgniteSerializationTest extends GridCommonAbstractTest {
+ /** */
+ private static final String CACHE_NAME = "cache_name";
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(final String gridName) throws Exception {
+ final IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+ if (gridName != null && gridName.startsWith("binary"))
+ cfg.setMarshaller(new BinaryMarshaller());
+
+ return cfg;
+ }
+
+ /**
+ * Test that calling {@link Ignition#localIgnite()}
+ * is safe for binary marshaller.
+ *
+ * @throws Exception If failed.
+ */
+ public void testPutGetSimple() throws Exception {
+ testPutGet(new SimpleTestObject("one"), null);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testPutGetSerializable() throws Exception {
+ testPutGet(new SerializableTestObject("test"), null);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testPutGetExternalizable() throws Exception {
+ testPutGet(new ExternalizableTestObject("test"), null);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testPutGetBinarylizable() throws Exception {
+ testPutGet(new BinarylizableTestObject("test"), "binaryIgnite");
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ private void testPutGet(final TestObject obj, final String gridName) throws Exception {
+ // Run async to emulate user thread.
+ GridTestUtils.runAsync(new Callable<Object>() {
+ @Override public Object call() throws Exception {
+ try (final Ignite ignite = startGrid(gridName)) {
+ final IgniteCache<Integer, TestObject> cache = ignite.getOrCreateCache(CACHE_NAME);
+
+ assertNull(obj.ignite());
+
+ cache.put(1, obj);
+
+ assertNotNull(obj.ignite());
+
+ final TestObject loadedObj = cache.get(1);
+
+ assertNotNull(loadedObj.ignite());
+
+ assertEquals(obj, loadedObj);
+ }
+
+ return null;
+ }
+ }).get();
+ }
+
+ /**
+ *
+ */
+ private interface TestObject {
+ /**
+ * @return Ignite instance.
+ */
+ Ignite ignite();
+ }
+
+ /**
+ * Test object.
+ */
+ private static class SimpleTestObject implements TestObject {
+ /** */
+ private final String val;
+
+ /** */
+ private transient Ignite ignite;
+
+ /** */
+ private SimpleTestObject(final String val) {
+ this.val = val;
+ }
+
+ /**
+ * @return Object.
+ */
+ @SuppressWarnings("unused")
+ private Object readResolve() {
+ ignite = Ignition.localIgnite();
+
+ return this;
+ }
+
+ /**
+ * @return Object.
+ */
+ @SuppressWarnings("unused")
+ private Object writeReplace() {
+ ignite = Ignition.localIgnite();
+
+ return this;
+ }
+
+ /** */
+ @Override public boolean equals(final Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+
+ final SimpleTestObject simpleTestObj = (SimpleTestObject) o;
+
+ return val != null ? val.equals(simpleTestObj.val) : simpleTestObj.val == null;
+
+ }
+
+ /** */
+ @Override public int hashCode() {
+ return val != null ? val.hashCode() : 0;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Ignite ignite() {
+ return ignite;
+ }
+ }
+
+ /**
+ *
+ */
+ private static class SerializableTestObject implements Serializable, TestObject {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** */
+ private String val;
+
+ /** */
+ private transient Ignite ignite;
+
+ /**
+ *
+ */
+ public SerializableTestObject() {
+ }
+
+ /**
+ * @param val Value
+ */
+ public SerializableTestObject(final String val) {
+ this.val = val;
+ }
+
+ /**
+ * @param out Object output.
+ * @throws IOException If fail.
+ */
+ private void writeObject(ObjectOutputStream out) throws IOException {
+ U.writeString(out, val);
+
+ ignite = Ignition.localIgnite();
+ }
+
+ /**
+ * @param in Object input.
+ * @throws IOException If fail.
+ */
+ private void readObject(ObjectInputStream in) throws IOException {
+ val = U.readString(in);
+
+ ignite = Ignition.localIgnite();
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean equals(final Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+
+ final SerializableTestObject that = (SerializableTestObject) o;
+
+ return val != null ? val.equals(that.val) : that.val == null;
+
+ }
+
+ /** {@inheritDoc} */
+ @Override public int hashCode() {
+ return val != null ? val.hashCode() : 0;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Ignite ignite() {
+ return ignite;
+ }
+ }
+
+ /**
+ *
+ */
+ private static class ExternalizableTestObject implements Externalizable, TestObject {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** */
+ private String val;
+
+ /** */
+ private transient Ignite ignite;
+
+ /**
+ *
+ */
+ public ExternalizableTestObject() {
+ }
+
+ /**
+ * @param val Value.
+ */
+ public ExternalizableTestObject(final String val) {
+ this.val = val;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeExternal(final ObjectOutput out) throws IOException {
+ U.writeString(out, val);
+
+ ignite = Ignition.localIgnite();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void readExternal(final ObjectInput in) throws IOException, ClassNotFoundException {
+ val = U.readString(in);
+
+ ignite = Ignition.localIgnite();
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean equals(final Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+
+ final ExternalizableTestObject that = (ExternalizableTestObject) o;
+
+ return val != null ? val.equals(that.val) : that.val == null;
+
+ }
+
+ /** {@inheritDoc} */
+ @Override public int hashCode() {
+ return val != null ? val.hashCode() : 0;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Ignite ignite() {
+ return ignite;
+ }
+ }
+
+ /**
+ *
+ */
+ private static class BinarylizableTestObject implements Binarylizable, TestObject {
+ /** */
+ private String val;
+
+ /** */
+ private transient Ignite ignite;
+
+ /**
+ *
+ */
+ public BinarylizableTestObject() {
+ }
+
+ /**
+ * @param val Value.
+ */
+ public BinarylizableTestObject(final String val) {
+ this.val = val;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeBinary(final BinaryWriter writer) throws BinaryObjectException {
+ writer.rawWriter().writeString(val);
+
+ ignite = Ignition.localIgnite();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void readBinary(final BinaryReader reader) throws BinaryObjectException {
+ val = reader.rawReader().readString();
+
+ ignite = Ignition.localIgnite();
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean equals(final Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+
+ final BinarylizableTestObject that = (BinarylizableTestObject) o;
+
+ return val != null ? val.equals(that.val) : that.val == null;
+
+ }
+
+ /** {@inheritDoc} */
+ @Override public int hashCode() {
+ return val != null ? val.hashCode() : 0;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Ignite ignite() {
+ return ignite;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/e5f3abd2/modules/core/src/test/java/org/apache/ignite/internal/util/nio/impl/GridNioFilterChainSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/util/nio/impl/GridNioFilterChainSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/util/nio/impl/GridNioFilterChainSelfTest.java
index 201fd27..61d93c1 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/util/nio/impl/GridNioFilterChainSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/util/nio/impl/GridNioFilterChainSelfTest.java
@@ -377,5 +377,10 @@ public class GridNioFilterChainSelfTest extends GridCommonAbstractTest {
@Nullable @Override public GridNioRecoveryDescriptor recoveryDescriptor() {
return null;
}
+
+ /** {@inheritDoc} */
+ @Override public String gridName() {
+ return null;
+ }
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/e5f3abd2/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java
index 32d9072..7014608 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java
@@ -1964,7 +1964,7 @@ public class TcpDiscoverySelfTest extends GridCommonAbstractTest {
if (msg instanceof TcpDiscoveryCustomEventMessage) {
try {
DiscoveryCustomMessage custMsg = GridTestUtils.getFieldValue(
- ((TcpDiscoveryCustomEventMessage)msg).message(marsh), "delegate");
+ ((TcpDiscoveryCustomEventMessage)msg).message(marsh, null), "delegate");
if (custMsg instanceof StartRoutineAckDiscoveryMessage) {
log.info("Skip message send and stop node: " + msg);
http://git-wip-us.apache.org/repos/asf/ignite/blob/e5f3abd2/modules/core/src/test/java/org/apache/ignite/stream/socket/SocketStreamerSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/stream/socket/SocketStreamerSelfTest.java b/modules/core/src/test/java/org/apache/ignite/stream/socket/SocketStreamerSelfTest.java
index 1056990..a59ebea 100644
--- a/modules/core/src/test/java/org/apache/ignite/stream/socket/SocketStreamerSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/stream/socket/SocketStreamerSelfTest.java
@@ -169,7 +169,7 @@ public class SocketStreamerSelfTest extends GridCommonAbstractTest {
*/
public void testSizeBasedCustomConverter() throws Exception {
SocketMessageConverter<Message> converter = new SocketMessageConverter<Message>() {
- @Override public Message convert(byte[] msg) {
+ @Override public Message convert(byte[] msg, String gridName) {
int i = (msg[0] & 0xFF) << 24;
i |= (msg[1] & 0xFF) << 16;
i |= (msg[2] & 0xFF) << 8;
@@ -233,7 +233,7 @@ public class SocketStreamerSelfTest extends GridCommonAbstractTest {
*/
public void testDelimiterBasedCustomConverter() throws Exception {
SocketMessageConverter<Message> converter = new SocketMessageConverter<Message>() {
- @Override public Message convert(byte[] msg) {
+ @Override public Message convert(byte[] msg, String gridName) {
int i = (msg[0] & 0xFF) << 24;
i |= (msg[1] & 0xFF) << 16;
i |= (msg[2] & 0xFF) << 8;
http://git-wip-us.apache.org/repos/asf/ignite/blob/e5f3abd2/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java
index 98a876f..e613c31 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java
@@ -34,6 +34,7 @@ import org.apache.ignite.internal.GridStartStopSelfTest;
import org.apache.ignite.internal.GridStopWithCancelSelfTest;
import org.apache.ignite.internal.IgniteSlowClientDetectionSelfTest;
import org.apache.ignite.internal.processors.affinity.GridAffinityProcessorRendezvousSelfTest;
+import org.apache.ignite.internal.processors.cache.GridLocalIgniteSerializationTest;
import org.apache.ignite.internal.processors.cache.GridProjectionForCachesOnDaemonNodeSelfTest;
import org.apache.ignite.internal.processors.cache.IgniteDaemonNodeMarshallerCacheTest;
import org.apache.ignite.internal.processors.cache.IgniteMarshallerCacheConcurrentReadWriteTest;
@@ -117,6 +118,7 @@ public class IgniteBasicTestSuite extends TestSuite {
suite.addTestSuite(IgniteSlowClientDetectionSelfTest.class);
GridTestUtils.addTestIfNeeded(suite, IgniteDaemonNodeMarshallerCacheTest.class, ignoredTests);
suite.addTestSuite(IgniteMarshallerCacheConcurrentReadWriteTest.class);
+ suite.addTestSuite(GridLocalIgniteSerializationTest.class);
suite.addTestSuite(IgniteExceptionInNioWorkerSelfTest.class);
http://git-wip-us.apache.org/repos/asf/ignite/blob/e5f3abd2/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/HadoopExternalCommunication.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/HadoopExternalCommunication.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/HadoopExternalCommunication.java
index 1d59a95..4203069 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/HadoopExternalCommunication.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/HadoopExternalCommunication.java
@@ -1179,6 +1179,7 @@ public class HadoopExternalCommunication {
endpoint,
accepted,
srvLsnr,
+ gridName,
filters());
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/e5f3abd2/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/HadoopIpcToNioAdapter.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/HadoopIpcToNioAdapter.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/HadoopIpcToNioAdapter.java
index a8de999..3d68cbc 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/HadoopIpcToNioAdapter.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/HadoopIpcToNioAdapter.java
@@ -62,14 +62,15 @@ public class HadoopIpcToNioAdapter<T> {
* @param log Log.
* @param endp Endpoint.
* @param lsnr Listener.
+ * @param gridname Ignite config.
* @param filters Filters.
*/
public HadoopIpcToNioAdapter(IgniteLogger log, IpcEndpoint endp, boolean accepted,
- GridNioServerListener<T> lsnr, GridNioFilter... filters) {
+ GridNioServerListener<T> lsnr, String gridname, GridNioFilter... filters) {
this.endp = endp;
chain = new GridNioFilterChain<>(log, lsnr, new HeadFilter(), filters);
- ses = new GridNioSessionImpl(chain, null, null, accepted);
+ ses = new GridNioSessionImpl(chain, null, null, accepted, gridname);
writeBuf = ByteBuffer.allocate(8 << 10);
http://git-wip-us.apache.org/repos/asf/ignite/blob/e5f3abd2/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/HadoopMarshallerFilter.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/HadoopMarshallerFilter.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/HadoopMarshallerFilter.java
index 3f79469..2c5c122 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/HadoopMarshallerFilter.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/HadoopMarshallerFilter.java
@@ -23,6 +23,7 @@ import org.apache.ignite.internal.util.nio.GridNioFilterAdapter;
import org.apache.ignite.internal.util.nio.GridNioFuture;
import org.apache.ignite.internal.util.nio.GridNioSession;
import org.apache.ignite.marshaller.Marshaller;
+import org.apache.ignite.marshaller.MarshallerUtils;
/**
* Serialization filter.
@@ -59,14 +60,15 @@ public class HadoopMarshallerFilter extends GridNioFilterAdapter {
@Override public GridNioFuture<?> onSessionWrite(GridNioSession ses, Object msg) throws IgniteCheckedException {
assert msg instanceof HadoopMessage : "Invalid message type: " + msg;
- return proceedSessionWrite(ses, marshaller.marshal(msg));
+ return proceedSessionWrite(ses, MarshallerUtils.marshal(ses.gridName(), marshaller, msg));
}
+ /** {@inheritDoc} */
@Override public void onMessageReceived(GridNioSession ses, Object msg) throws IgniteCheckedException {
assert msg instanceof byte[];
// Always unmarshal with system classloader.
- proceedMessageReceived(ses, marshaller.unmarshal((byte[])msg, null));
+ proceedMessageReceived(ses, MarshallerUtils.unmarshal(ses.gridName(), marshaller, (byte[])msg, null));
}
/** {@inheritDoc} */