You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2016/03/10 13:40:53 UTC
[11/12] ignite git commit: IGNITE-2700: Closures are now written
using binary marshaller. This closes #518.
IGNITE-2700: Closures are now written using binary marshaller. This closes #518.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/215e8a1e
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/215e8a1e
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/215e8a1e
Branch: refs/heads/ignite-1786
Commit: 215e8a1e3143bf22d792fdefbd4e6a65b372ae24
Parents: 8cf71d4
Author: Ilya Lantukh <il...@gridgain.com>
Authored: Thu Mar 10 15:37:14 2016 +0300
Committer: vozerov-gridgain <vo...@gridgain.com>
Committed: Thu Mar 10 15:37:14 2016 +0300
----------------------------------------------------------------------
.../ignite/internal/binary/BinaryContext.java | 29 +-
.../internal/binary/BinaryFieldAccessor.java | 1 +
.../closure/GridClosureProcessor.java | 375 ++++++++++++++++-
.../resources/META-INF/classnames.properties | 6 +
.../ignite/internal/GridAffinitySelfTest.java | 4 +-
...omputationBinarylizableClosuresSelfTest.java | 413 +++++++++++++++++++
.../binary/BinaryMarshallerSelfTest.java | 50 +++
...IgniteBinaryObjectsComputeGridTestSuite.java | 7 +-
8 files changed, 858 insertions(+), 27 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/215e8a1e/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryContext.java b/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryContext.java
index 4df9ba2..b9b633f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryContext.java
@@ -61,6 +61,7 @@ import org.apache.ignite.cache.affinity.AffinityKeyMapped;
import org.apache.ignite.configuration.BinaryConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.processors.cache.binary.BinaryMetadataKey;
+import org.apache.ignite.internal.processors.closure.GridClosureProcessor;
import org.apache.ignite.internal.processors.datastructures.CollocatedQueueItemKey;
import org.apache.ignite.internal.processors.datastructures.CollocatedSetItemKey;
import org.apache.ignite.internal.util.IgniteUtils;
@@ -89,6 +90,23 @@ public class BinaryContext {
static final BinaryInternalMapper SIMPLE_NAME_LOWER_CASE_MAPPER =
new BinaryInternalMapper(new BinaryBasicNameMapper(true), new BinaryBasicIdMapper(true), false);
+ /** Set of system classes that should be marshalled with BinaryMarshaller. */
+ private static final Set<String> BINARYLIZABLE_SYS_CLSS;
+
+ /** Binarylizable system classes set initialization. */
+ static {
+ Set<String> sysClss = new HashSet<>();
+
+ sysClss.add(GridClosureProcessor.C1V2.class.getName());
+ sysClss.add(GridClosureProcessor.C1MLAV2.class.getName());
+ sysClss.add(GridClosureProcessor.C2V2.class.getName());
+ sysClss.add(GridClosureProcessor.C2MLAV2.class.getName());
+ sysClss.add(GridClosureProcessor.C4V2.class.getName());
+ sysClss.add(GridClosureProcessor.C4MLAV2.class.getName());
+
+ BINARYLIZABLE_SYS_CLSS = Collections.unmodifiableSet(sysClss);
+ }
+
/** */
private final ConcurrentMap<Class<?>, BinaryClassDescriptor> descByCls = new ConcurrentHashMap8<>();
@@ -255,7 +273,7 @@ public class BinaryContext {
/**
* @return Ignite configuration.
*/
- public IgniteConfiguration configuration(){
+ public IgniteConfiguration configuration() {
return igniteCfg;
}
@@ -587,6 +605,11 @@ public class BinaryContext {
String clsName = cls.getName();
if (marshCtx.isSystemType(clsName)) {
+ BinarySerializer serializer = null;
+
+ if (BINARYLIZABLE_SYS_CLSS.contains(clsName))
+ serializer = new BinaryReflectiveSerializer();
+
desc = new BinaryClassDescriptor(this,
cls,
false,
@@ -594,7 +617,7 @@ public class BinaryContext {
clsName,
null,
SIMPLE_NAME_LOWER_CASE_MAPPER,
- null,
+ serializer,
false,
true /* registered */
);
@@ -775,7 +798,7 @@ public class BinaryContext {
if (prevMap != null && !mapper.equals(prevMap))
throw new IgniteException("Different mappers [clsName=" + clsName + ", newMapper=" + mapper
- + ", prevMap=" + prevMap + "]");
+ + ", prevMap=" + prevMap + "]");
prevMap = typeId2Mapper.putIfAbsent(mapper.typeId(clsName), mapper);
http://git-wip-us.apache.org/repos/asf/ignite/blob/215e8a1e/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryFieldAccessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryFieldAccessor.java b/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryFieldAccessor.java
index 8c8bf27..af33b63 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryFieldAccessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryFieldAccessor.java
@@ -619,6 +619,7 @@ public abstract class BinaryFieldAccessor {
case BINARY:
case OBJECT:
+ case PROXY:
writer.writeObjectField(val);
break;
http://git-wip-us.apache.org/repos/asf/ignite/blob/215e8a1e/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java
index 043f754..c6883dc 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java
@@ -31,6 +31,12 @@ import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
+import org.apache.ignite.binary.BinaryObjectException;
+import org.apache.ignite.binary.BinaryRawReader;
+import org.apache.ignite.binary.BinaryRawWriter;
+import org.apache.ignite.binary.BinaryReader;
+import org.apache.ignite.binary.BinaryWriter;
+import org.apache.ignite.binary.Binarylizable;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.compute.ComputeJob;
import org.apache.ignite.compute.ComputeJobMasterLeaveAware;
@@ -61,6 +67,7 @@ import org.apache.ignite.internal.util.worker.GridWorker;
import org.apache.ignite.internal.util.worker.GridWorkerFuture;
import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.lang.IgniteClosure;
+import org.apache.ignite.lang.IgniteProductVersion;
import org.apache.ignite.lang.IgniteReducer;
import org.apache.ignite.marshaller.Marshaller;
import org.apache.ignite.resources.LoadBalancerResource;
@@ -76,6 +83,9 @@ import static org.apache.ignite.internal.processors.task.GridTaskThreadContextKe
*
*/
public class GridClosureProcessor extends GridProcessorAdapter {
+ /** Ignite version in which binarylizable versions of closures were introduced. */
+ public static final IgniteProductVersion BINARYLIZABLE_CLOSURES_SINCE = IgniteProductVersion.fromString("1.6.0");
+
/** */
private final Executor sysPool;
@@ -254,7 +264,7 @@ public class GridClosureProcessor extends GridProcessorAdapter {
case BROADCAST: {
for (ClusterNode n : nodes)
for (Runnable r : jobs)
- mapper.map(job(r), n);
+ mapper.map(downgradeJobIfNeeded(job(r), n), n);
break;
}
@@ -263,7 +273,9 @@ public class GridClosureProcessor extends GridProcessorAdapter {
for (Runnable r : jobs) {
ComputeJob job = job(r);
- mapper.map(job, lb.getBalancedNode(job, null));
+ ClusterNode n = lb.getBalancedNode(job, null);
+
+ mapper.map(downgradeJobIfNeeded(job, n), n);
}
break;
@@ -306,7 +318,7 @@ public class GridClosureProcessor extends GridProcessorAdapter {
case BROADCAST: {
for (ClusterNode n : nodes)
for (Callable<R> c : jobs)
- mapper.map(job(c), n);
+ mapper.map(downgradeJobIfNeeded(job(c), n), n);
break;
}
@@ -315,7 +327,9 @@ public class GridClosureProcessor extends GridProcessorAdapter {
for (Callable<R> c : jobs) {
ComputeJob job = job(c);
- mapper.map(job, lb.getBalancedNode(job, null));
+ ClusterNode n = lb.getBalancedNode(job, null);
+
+ mapper.map(downgradeJobIfNeeded(job, n), n);
}
break;
@@ -1025,7 +1039,7 @@ public class GridClosureProcessor extends GridProcessorAdapter {
private static <T, R> ComputeJob job(final IgniteClosure<T, R> job, @Nullable final T arg) {
A.notNull(job, "job");
- return job instanceof ComputeJobMasterLeaveAware ? new C1MLA<>(job, arg) : new C1<>(job, arg);
+ return job instanceof ComputeJobMasterLeaveAware ? new C1MLAV2<>(job, arg) : new C1V2<>(job, arg);
}
/**
@@ -1037,7 +1051,7 @@ public class GridClosureProcessor extends GridProcessorAdapter {
private static <R> ComputeJob job(final Callable<R> c) {
A.notNull(c, "job");
- return c instanceof ComputeJobMasterLeaveAware ? new C2MLA<>(c) : new C2<>(c);
+ return c instanceof ComputeJobMasterLeaveAware ? new C2MLAV2<>(c) : new C2V2<>(c);
}
/**
@@ -1049,7 +1063,46 @@ public class GridClosureProcessor extends GridProcessorAdapter {
private static ComputeJob job(final Runnable r) {
A.notNull(r, "job");
- return r instanceof ComputeJobMasterLeaveAware ? new C4MLA(r) : new C4(r);
+ return r instanceof ComputeJobMasterLeaveAware ? new C4MLAV2(r) : new C4V2(r);
+ }
+
+ /**
+ * Downgrades provided job to older version if target does not support it.
+ *
+ * @param job Job.
+ * @param node Node.
+ * @return Provided or downgraded job.
+ */
+ private static ComputeJob downgradeJobIfNeeded(ComputeJob job, ClusterNode node) {
+ A.notNull(job, "job");
+
+ assert node != null;
+
+ IgniteProductVersion nodeVer = node.version();
+
+ if (nodeVer.compareTo(BINARYLIZABLE_CLOSURES_SINCE) >= 0)
+ return job;
+
+ if (job instanceof C1V2) {
+ if (job instanceof C1MLAV2)
+ return new C1MLA<>(((C1MLAV2)job).job, ((C1MLAV2)job).arg);
+ else
+ return new C1<>(((C1V2)job).job, ((C1V2)job).arg);
+ }
+ else if (job instanceof C2V2) {
+ if (job instanceof C2MLAV2)
+ return new C2MLA<>(((C2MLAV2)job).c);
+ else
+ return new C2<>(((C2V2)job).c);
+ }
+ else if (job instanceof C4V2) {
+ if (job instanceof C4MLAV2)
+ return new C4MLA(((C4MLAV2)job).r);
+ else
+ return new C4(((C4V2)job).r);
+ }
+
+ return job;
}
/**
@@ -1294,9 +1347,7 @@ public class GridClosureProcessor extends GridProcessorAdapter {
/** {@inheritDoc} */
@Override public Map<? extends ComputeJob, ClusterNode> map(List<ClusterNode> subgrid, @Nullable Void arg) {
- ComputeJob job = job(this.job);
-
- return Collections.singletonMap(job, node);
+ return Collections.singletonMap(downgradeJobIfNeeded(job(this.job), node), node);
}
/** {@inheritDoc} */
@@ -1348,9 +1399,7 @@ public class GridClosureProcessor extends GridProcessorAdapter {
/** {@inheritDoc} */
@Override public Map<? extends ComputeJob, ClusterNode> map(List<ClusterNode> subgrid, @Nullable Void arg) {
- ComputeJob job = job(this.job);
-
- return Collections.singletonMap(job, node);
+ return Collections.singletonMap(downgradeJobIfNeeded(job(this.job), node), node);
}
/** {@inheritDoc} */
@@ -1488,7 +1537,9 @@ public class GridClosureProcessor extends GridProcessorAdapter {
@Override public Map<? extends ComputeJob, ClusterNode> map(List<ClusterNode> subgrid, @Nullable Void arg) {
ComputeJob job = job(this.job, this.arg);
- return Collections.singletonMap(job, lb.getBalancedNode(job, null));
+ ClusterNode node = lb.getBalancedNode(job, null);
+
+ return Collections.singletonMap(downgradeJobIfNeeded(job, node), node);
}
/** {@inheritDoc} */
@@ -1537,7 +1588,9 @@ public class GridClosureProcessor extends GridProcessorAdapter {
for (T jobArg : args) {
ComputeJob job = job(this.job, jobArg);
- mapper.map(job, lb.getBalancedNode(job, null));
+ ClusterNode node = lb.getBalancedNode(job, null);
+
+ mapper.map(downgradeJobIfNeeded(job, node), node);
}
return mapper.map();
@@ -1593,7 +1646,9 @@ public class GridClosureProcessor extends GridProcessorAdapter {
for (T jobArg : args) {
ComputeJob job = job(this.job, jobArg);
- mapper.map(job, lb.getBalancedNode(job, null));
+ ClusterNode node = lb.getBalancedNode(job, null);
+
+ mapper.map(downgradeJobIfNeeded(job, node), node);
}
return mapper.map();
@@ -1607,7 +1662,7 @@ public class GridClosureProcessor extends GridProcessorAdapter {
@Override public ComputeJobResultPolicy result(ComputeJobResult res, List<ComputeJobResult> rcvd) {
ComputeJobResultPolicy resPlc = super.result(res, rcvd);
- if (res.getException() == null && resPlc != FAILOVER && !rdc.collect((R1) res.getData()))
+ if (res.getException() == null && resPlc != FAILOVER && !rdc.collect((R1)res.getData()))
resPlc = REDUCE; // If reducer returned false - reduce right away.
return resPlc;
@@ -1647,7 +1702,7 @@ public class GridClosureProcessor extends GridProcessorAdapter {
JobMapper mapper = new JobMapper(subgrid.size());
for (ClusterNode n : subgrid)
- mapper.map(job(job, arg), n);
+ mapper.map(downgradeJobIfNeeded(job(job, arg), n), n);
return mapper.map();
}
@@ -1680,7 +1735,7 @@ public class GridClosureProcessor extends GridProcessorAdapter {
/**
*
*/
- public C1(){
+ public C1() {
// No-op.
}
@@ -1729,6 +1784,72 @@ public class GridClosureProcessor extends GridProcessorAdapter {
/**
*
*/
+ public static class C1V2<T, R> implements ComputeJob, Binarylizable, GridNoImplicitInjection,
+ GridInternalWrapper<IgniteClosure> {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** */
+ protected IgniteClosure<T, R> job;
+
+ /** */
+ @GridToStringInclude
+ protected T arg;
+
+ /**
+ *
+ */
+ public C1V2() {
+ // No-op.
+ }
+
+ /**
+ * @param job Job.
+ * @param arg Argument.
+ */
+ C1V2(IgniteClosure<T, R> job, T arg) {
+ this.job = job;
+ this.arg = arg;
+ }
+
+ /** {@inheritDoc} */
+ @Nullable @Override public Object execute() {
+ return job.apply(arg);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void cancel() {
+ // No-op.
+ }
+
+ @Override public void writeBinary(BinaryWriter writer) throws BinaryObjectException {
+ BinaryRawWriter rawWriter = writer.rawWriter();
+
+ rawWriter.writeObject(job);
+ rawWriter.writeObject(arg);
+ }
+
+ @Override public void readBinary(BinaryReader reader) throws BinaryObjectException {
+ BinaryRawReader rawReader = reader.rawReader();
+
+ job = rawReader.readObject();
+ arg = rawReader.readObject();
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteClosure userObject() {
+ return job;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(C1V2.class, this);
+ }
+ }
+
+ /**
+ *
+ */
private static class C1MLA<T, R> extends C1<T, R> implements ComputeJobMasterLeaveAware {
/** */
private static final long serialVersionUID = 0L;
@@ -1762,6 +1883,39 @@ public class GridClosureProcessor extends GridProcessorAdapter {
/**
*
*/
+ public static class C1MLAV2<T, R> extends C1V2<T, R> implements ComputeJobMasterLeaveAware {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /**
+ *
+ */
+ public C1MLAV2() {
+ // No-op.
+ }
+
+ /**
+ * @param job Job.
+ * @param arg Argument.
+ */
+ private C1MLAV2(IgniteClosure<T, R> job, T arg) {
+ super(job, arg);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onMasterNodeLeft(ComputeTaskSession ses) {
+ ((ComputeJobMasterLeaveAware)job).onMasterNodeLeft(ses);
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(C1MLAV2.class, this, super.toString());
+ }
+ }
+
+ /**
+ *
+ */
private static class C2<R> implements ComputeJob, Externalizable, GridNoImplicitInjection, GridInternalWrapper<Callable> {
/** */
private static final long serialVersionUID = 0L;
@@ -1772,7 +1926,7 @@ public class GridClosureProcessor extends GridProcessorAdapter {
/**
*
*/
- public C2(){
+ public C2() {
// No-op.
}
@@ -1822,7 +1976,66 @@ public class GridClosureProcessor extends GridProcessorAdapter {
/**
*
*/
- private static class C2MLA<R> extends C2<R> implements ComputeJobMasterLeaveAware{
+ public static class C2V2<R> implements ComputeJob, Binarylizable, GridNoImplicitInjection,
+ GridInternalWrapper<Callable> {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** */
+ protected Callable<R> c;
+
+ /**
+ *
+ */
+ public C2V2() {
+ // No-op.
+ }
+
+ /**
+ * @param c Callable.
+ */
+ private C2V2(Callable<R> c) {
+ this.c = c;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Object execute() {
+ try {
+ return c.call();
+ }
+ catch (Exception e) {
+ throw new IgniteException(e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void cancel() {
+ // No-op.
+ }
+
+ @Override public void writeBinary(BinaryWriter writer) throws BinaryObjectException {
+ writer.rawWriter().writeObject(c);
+ }
+
+ @Override public void readBinary(BinaryReader reader) throws BinaryObjectException {
+ c = reader.rawReader().readObject();
+ }
+
+ /** {@inheritDoc} */
+ @Override public Callable userObject() {
+ return c;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(C2V2.class, this);
+ }
+ }
+
+ /**
+ *
+ */
+ private static class C2MLA<R> extends C2<R> implements ComputeJobMasterLeaveAware {
/** */
private static final long serialVersionUID = 0L;
@@ -1852,6 +2065,38 @@ public class GridClosureProcessor extends GridProcessorAdapter {
}
/**
+ *
+ */
+ public static class C2MLAV2<R> extends C2V2<R> implements ComputeJobMasterLeaveAware {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /**
+ *
+ */
+ public C2MLAV2() {
+ // No-op.
+ }
+
+ /**
+ * @param c Callable.
+ */
+ private C2MLAV2(Callable<R> c) {
+ super(c);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onMasterNodeLeft(ComputeTaskSession ses) {
+ ((ComputeJobMasterLeaveAware)c).onMasterNodeLeft(ses);
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(C2MLAV2.class, this, super.toString());
+ }
+ }
+
+ /**
*/
private static class C4 implements ComputeJob, Externalizable, GridNoImplicitInjection, GridInternalWrapper<Runnable> {
/** */
@@ -1863,7 +2108,7 @@ public class GridClosureProcessor extends GridProcessorAdapter {
/**
*
*/
- public C4(){
+ public C4() {
// No-op.
}
@@ -1908,6 +2153,60 @@ public class GridClosureProcessor extends GridProcessorAdapter {
}
/**
+ */
+ public static class C4V2 implements ComputeJob, Binarylizable, GridNoImplicitInjection, GridInternalWrapper<Runnable> {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** */
+ protected Runnable r;
+
+ /**
+ *
+ */
+ public C4V2() {
+ // No-op.
+ }
+
+ /**
+ * @param r Runnable.
+ */
+ private C4V2(Runnable r) {
+ this.r = r;
+ }
+
+ /** {@inheritDoc} */
+ @Nullable @Override public Object execute() {
+ r.run();
+
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void cancel() {
+ // No-op.
+ }
+
+ @Override public void writeBinary(BinaryWriter writer) throws BinaryObjectException {
+ writer.rawWriter().writeObject(r);
+ }
+
+ @Override public void readBinary(BinaryReader reader) throws BinaryObjectException {
+ r = reader.rawReader().readObject();
+ }
+
+ /** {@inheritDoc} */
+ @Override public Runnable userObject() {
+ return r;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(C4V2.class, this);
+ }
+ }
+
+ /**
*
*/
private static class C4MLA extends C4 implements ComputeJobMasterLeaveAware {
@@ -1938,4 +2237,36 @@ public class GridClosureProcessor extends GridProcessorAdapter {
return S.toString(C4MLA.class, this, super.toString());
}
}
+
+ /**
+ *
+ */
+ public static class C4MLAV2 extends C4V2 implements ComputeJobMasterLeaveAware {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /**
+ *
+ */
+ public C4MLAV2() {
+ // No-op.
+ }
+
+ /**
+ * @param r Runnable.
+ */
+ private C4MLAV2(Runnable r) {
+ super(r);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onMasterNodeLeft(ComputeTaskSession ses) {
+ ((ComputeJobMasterLeaveAware)r).onMasterNodeLeft(ses);
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(C4MLAV2.class, this, super.toString());
+ }
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/215e8a1e/modules/core/src/main/resources/META-INF/classnames.properties
----------------------------------------------------------------------
diff --git a/modules/core/src/main/resources/META-INF/classnames.properties b/modules/core/src/main/resources/META-INF/classnames.properties
index 8c3ad88..9728d9c 100644
--- a/modules/core/src/main/resources/META-INF/classnames.properties
+++ b/modules/core/src/main/resources/META-INF/classnames.properties
@@ -900,11 +900,17 @@ org.apache.ignite.internal.processors.clock.GridClockDeltaSnapshotMessage
org.apache.ignite.internal.processors.clock.GridClockDeltaVersion
org.apache.ignite.internal.processors.closure.GridClosurePolicy
org.apache.ignite.internal.processors.closure.GridClosureProcessor$C1
+org.apache.ignite.internal.processors.closure.GridClosureProcessor$C1V2
org.apache.ignite.internal.processors.closure.GridClosureProcessor$C1MLA
+org.apache.ignite.internal.processors.closure.GridClosureProcessor$C1MLAV2
org.apache.ignite.internal.processors.closure.GridClosureProcessor$C2
+org.apache.ignite.internal.processors.closure.GridClosureProcessor$C2V2
org.apache.ignite.internal.processors.closure.GridClosureProcessor$C2MLA
+org.apache.ignite.internal.processors.closure.GridClosureProcessor$C2MLAV2
org.apache.ignite.internal.processors.closure.GridClosureProcessor$C4
+org.apache.ignite.internal.processors.closure.GridClosureProcessor$C4V2
org.apache.ignite.internal.processors.closure.GridClosureProcessor$C4MLA
+org.apache.ignite.internal.processors.closure.GridClosureProcessor$C4MLAV2
org.apache.ignite.internal.processors.closure.GridClosureProcessor$T1
org.apache.ignite.internal.processors.closure.GridClosureProcessor$T10
org.apache.ignite.internal.processors.closure.GridClosureProcessor$T11
http://git-wip-us.apache.org/repos/asf/ignite/blob/215e8a1e/modules/core/src/test/java/org/apache/ignite/internal/GridAffinitySelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/GridAffinitySelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/GridAffinitySelfTest.java
index a75023f..9e438e9 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/GridAffinitySelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/GridAffinitySelfTest.java
@@ -84,13 +84,15 @@ public class GridAffinitySelfTest extends GridCommonAbstractTest {
/**
* @throws IgniteCheckedException If failed.
*/
- public void testAffinity() throws IgniteCheckedException {
+ public void testAffinity() throws Exception {
Ignite g1 = grid(1);
Ignite g2 = grid(2);
assert caches(g1).size() == 0;
assert F.first(caches(g2)).getCacheMode() == PARTITIONED;
+ awaitPartitionMapExchange();
+
Map<ClusterNode, Collection<String>> map = g1.<String>affinity(null).mapKeysToNodes(F.asList("1"));
assertNotNull(map);
http://git-wip-us.apache.org/repos/asf/ignite/blob/215e8a1e/modules/core/src/test/java/org/apache/ignite/internal/GridComputationBinarylizableClosuresSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/GridComputationBinarylizableClosuresSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/GridComputationBinarylizableClosuresSelfTest.java
new file mode 100644
index 0000000..96f0277
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/GridComputationBinarylizableClosuresSelfTest.java
@@ -0,0 +1,413 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.ignite.internal;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.binary.BinaryObjectException;
+import org.apache.ignite.binary.BinaryReader;
+import org.apache.ignite.binary.BinaryWriter;
+import org.apache.ignite.binary.Binarylizable;
+import org.apache.ignite.compute.ComputeJobMasterLeaveAware;
+import org.apache.ignite.compute.ComputeTaskSession;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.binary.BinaryMarshaller;
+import org.apache.ignite.lang.IgniteCallable;
+import org.apache.ignite.lang.IgniteClosure;
+import org.apache.ignite.lang.IgniteRunnable;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+/**
+ * Test ensuring that correct closures are serialized.
+ */
+public class GridComputationBinarylizableClosuresSelfTest extends GridCommonAbstractTest {
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+ cfg.setMarshaller(new BinaryMarshaller());
+
+ return cfg;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTest() throws Exception {
+ super.beforeTest();
+
+ TestBinarylizableClosure.writeCalled.set(false);
+ TestBinarylizableClosure.readCalled.set(false);
+ TestBinarylizableClosure.executed.set(false);
+
+ TestBinarylizableMasterLeaveAwareClosure.writeCalled.set(false);
+ TestBinarylizableMasterLeaveAwareClosure.readCalled.set(false);
+
+ TestBinarylizableCallable.writeCalled.set(false);
+ TestBinarylizableCallable.readCalled.set(false);
+ TestBinarylizableCallable.executed.set(false);
+
+ TestBinarylizableMasterLeaveAwareCallable.writeCalled.set(false);
+ TestBinarylizableMasterLeaveAwareCallable.readCalled.set(false);
+
+ TestBinarylizableRunnable.writeCalled.set(false);
+ TestBinarylizableRunnable.readCalled.set(false);
+ TestBinarylizableRunnable.executed.set(false);
+
+ TestBinarylizableMasterLeaveAwareRunnable.writeCalled.set(false);
+ TestBinarylizableMasterLeaveAwareRunnable.readCalled.set(false);
+
+ TestBinarylizableObject.writeCalled.set(false);
+ TestBinarylizableObject.readCalled.set(false);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ super.afterTest();
+
+ stopAllGrids();
+ }
+
+ /**
+ * Test that Binarylizable IgniteClosure is serialized using BinaryMarshaller.
+ *
+ * @throws Exception If failed.
+ */
+ public void testJob() throws Exception {
+ Ignite ignite = startGrid(1);
+ startGrid(2);
+
+ final TestBinarylizableClosure closure = new TestBinarylizableClosure();
+
+ ignite.compute(ignite.cluster().forRemotes()).apply(closure, new TestBinarylizableObject());
+
+ assert TestBinarylizableClosure.executed.get();
+ assert TestBinarylizableClosure.writeCalled.get();
+ assert TestBinarylizableClosure.readCalled.get();
+
+ assert TestBinarylizableObject.writeCalled.get();
+ assert TestBinarylizableObject.readCalled.get();
+ }
+
+ /**
+ * Test that Binarylizable IgniteClosure with ComputeJobMasterLeaveAware interface is serialized
+ * using BinaryMarshaller.
+ *
+ * @throws Exception If failed.
+ */
+ public void testMasterLeaveAwareJob() throws Exception {
+ Ignite ignite = startGrid(1);
+ startGrid(2);
+
+ final TestBinarylizableMasterLeaveAwareClosure job = new TestBinarylizableMasterLeaveAwareClosure();
+
+ ignite.compute(ignite.cluster().forRemotes()).apply(job, new TestBinarylizableObject());
+
+ assert TestBinarylizableClosure.executed.get();
+ assert TestBinarylizableClosure.writeCalled.get();
+ assert TestBinarylizableClosure.readCalled.get();
+
+ assert TestBinarylizableMasterLeaveAwareClosure.writeCalled.get();
+ assert TestBinarylizableMasterLeaveAwareClosure.readCalled.get();
+
+ assert TestBinarylizableObject.writeCalled.get();
+ assert TestBinarylizableObject.readCalled.get();
+ }
+
+ /**
+ * Test that Binarylizable IgniteCallable is serialized using BinaryMarshaller.
+ *
+ * @throws Exception If failed.
+ */
+ public void testCallable() throws Exception {
+ Ignite ignite = startGrid(1);
+ startGrid(2);
+
+ final TestBinarylizableCallable callable = new TestBinarylizableCallable();
+
+ ignite.compute(ignite.cluster().forRemotes()).call(callable);
+
+ assert TestBinarylizableCallable.executed.get();
+ assert TestBinarylizableCallable.writeCalled.get();
+ assert TestBinarylizableCallable.readCalled.get();
+ }
+
+ /**
+ * Test that Binarylizable IgniteCallable with ComputeJobMasterLeaveAware interface is serialized
+ * using BinaryMarshaller.
+ *
+ * @throws Exception If failed.
+ */
+ public void testMasterLeaveAwareCallable() throws Exception {
+ Ignite ignite = startGrid(1);
+ startGrid(2);
+
+ final TestBinarylizableMasterLeaveAwareCallable callable = new TestBinarylizableMasterLeaveAwareCallable();
+
+ ignite.compute(ignite.cluster().forRemotes()).call(callable);
+
+ assert TestBinarylizableCallable.executed.get();
+ assert TestBinarylizableCallable.writeCalled.get();
+ assert TestBinarylizableCallable.readCalled.get();
+
+ assert TestBinarylizableMasterLeaveAwareCallable.writeCalled.get();
+ assert TestBinarylizableMasterLeaveAwareCallable.readCalled.get();
+ }
+
+ /**
+ * Test that Binarylizable IgniteRunnable is serialized using BinaryMarshaller.
+ *
+ * @throws Exception If failed.
+ */
+ public void testRunnable() throws Exception {
+ Ignite ignite = startGrid(1);
+ startGrid(2);
+
+ final TestBinarylizableRunnable runnable = new TestBinarylizableRunnable();
+
+ ignite.compute(ignite.cluster().forRemotes()).run(runnable);
+
+ assert TestBinarylizableRunnable.executed.get();
+ assert TestBinarylizableRunnable.writeCalled.get();
+ assert TestBinarylizableRunnable.readCalled.get();
+ }
+
+ /**
+ * Test that Binarylizable IgniteRunnable with ComputeJobMasterLeaveAware interface is serialized
+ * using BinaryMarshaller.
+ *
+ * @throws Exception If failed.
+ */
+ public void testMasterLeaveAwareRunnable() throws Exception {
+ Ignite ignite = startGrid(1);
+ startGrid(2);
+
+ final TestBinarylizableMasterLeaveAwareRunnable runnable = new TestBinarylizableMasterLeaveAwareRunnable();
+
+ ignite.compute(ignite.cluster().forRemotes()).run(runnable);
+
+ assert TestBinarylizableRunnable.executed.get();
+ assert TestBinarylizableRunnable.writeCalled.get();
+ assert TestBinarylizableRunnable.readCalled.get();
+
+ assert TestBinarylizableMasterLeaveAwareRunnable.writeCalled.get();
+ assert TestBinarylizableMasterLeaveAwareRunnable.readCalled.get();
+ }
+
+ /**
+ * Test Binarylizable IgniteClosure.
+ */
+ private static class TestBinarylizableClosure implements IgniteClosure, Binarylizable {
+
+ /** Tracks {@link TestBinarylizableClosure::writeBinary(BinaryWriter writer)} calls. */
+ private static AtomicBoolean writeCalled = new AtomicBoolean();
+
+ /** Tracks {@link TestBinarylizableClosure::readBinary(BinaryReader reader)} calls. */
+ private static AtomicBoolean readCalled = new AtomicBoolean();
+
+ /** Tracks {@link TestBinarylizableClosure::apply(Object o)} calls. */
+ private static AtomicBoolean executed = new AtomicBoolean();
+
+ /** {@inheritDoc} */
+ @Override public Object apply(Object o) {
+ executed.set(true);
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeBinary(BinaryWriter writer) throws BinaryObjectException {
+ writeCalled.set(true);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void readBinary(BinaryReader reader) throws BinaryObjectException {
+ readCalled.set(true);
+ }
+ }
+
+ /**
+ * Test Binarylizable IgniteClosure with ComputeJobMasterLeaveAware interface.
+ */
+ private static class TestBinarylizableMasterLeaveAwareClosure extends TestBinarylizableClosure
+ implements ComputeJobMasterLeaveAware {
+
+ /** Tracks {@link TestBinarylizableMasterLeaveAwareClosure::writeBinary(BinaryWriter writer)} calls. */
+ private static AtomicBoolean writeCalled = new AtomicBoolean();
+
+ /** Tracks {@link TestBinarylizableMasterLeaveAwareClosure::readBinary(BinaryReader reader)} calls. */
+ private static AtomicBoolean readCalled = new AtomicBoolean();
+
+ /** {@inheritDoc} */
+ @Override public void writeBinary(BinaryWriter writer) throws BinaryObjectException {
+ super.writeBinary(writer);
+ writeCalled.set(true);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void readBinary(BinaryReader reader) throws BinaryObjectException {
+ super.readBinary(reader);
+ readCalled.set(true);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onMasterNodeLeft(ComputeTaskSession ses) throws IgniteException {
+ }
+ }
+
+ /**
+ * Test Binarylizable object.
+ */
+ private static class TestBinarylizableObject implements Binarylizable {
+
+ /** Tracks {@link TestBinarylizableObject::writeBinary(BinaryWriter writer)} calls. */
+ private static AtomicBoolean writeCalled = new AtomicBoolean();
+
+ /** Tracks {@link TestBinarylizableObject::readBinary(BinaryReader reader)} calls. */
+ private static AtomicBoolean readCalled = new AtomicBoolean();
+
+ /** {@inheritDoc} */
+ @Override public void writeBinary(BinaryWriter writer) throws BinaryObjectException {
+ writeCalled.set(true);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void readBinary(BinaryReader reader) throws BinaryObjectException {
+ readCalled.set(true);
+ }
+ }
+
+ /**
+ * Test Binarylizable Callable.
+ */
+ private static class TestBinarylizableCallable implements IgniteCallable, Binarylizable {
+
+ /** Tracks {@link TestBinarylizableCallable::writeBinary(BinaryWriter writer)} calls. */
+ private static AtomicBoolean writeCalled = new AtomicBoolean();
+
+ /** Tracks {@link TestBinarylizableCallable::readBinary(BinaryReader reader)} calls. */
+ private static AtomicBoolean readCalled = new AtomicBoolean();
+
+ /** Tracks {@link TestBinarylizableCallable::call()} calls. */
+ private static AtomicBoolean executed = new AtomicBoolean();
+
+ /** {@inheritDoc} */
+ @Override public Object call() throws Exception {
+ executed.set(true);
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeBinary(BinaryWriter writer) throws BinaryObjectException {
+ writeCalled.set(true);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void readBinary(BinaryReader reader) throws BinaryObjectException {
+ readCalled.set(true);
+ }
+ }
+
+ /**
+ * Test Binarylizable Callable with ComputeJobMasterLeaveAware interface.
+ */
+ private static class TestBinarylizableMasterLeaveAwareCallable extends TestBinarylizableCallable
+ implements ComputeJobMasterLeaveAware {
+
+ /** Tracks {@link TestBinarylizableMasterLeaveAwareCallable::writeBinary(BinaryWriter writer)} calls. */
+ private static AtomicBoolean writeCalled = new AtomicBoolean();
+
+ /** Tracks {@link TestBinarylizableMasterLeaveAwareCallable::readBinary(BinaryReader reader)} calls. */
+ private static AtomicBoolean readCalled = new AtomicBoolean();
+
+ /** {@inheritDoc} */
+ @Override public void writeBinary(BinaryWriter writer) throws BinaryObjectException {
+ super.writeBinary(writer);
+ writeCalled.set(true);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void readBinary(BinaryReader reader) throws BinaryObjectException {
+ super.readBinary(reader);
+ readCalled.set(true);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onMasterNodeLeft(ComputeTaskSession ses) throws IgniteException {
+ }
+ }
+
+ /**
+ * Test Binarylizable Runnable.
+ */
+ private static class TestBinarylizableRunnable implements IgniteRunnable, Binarylizable {
+
+ /** Tracks {@link TestBinarylizableRunnable::writeBinary(BinaryWriter writer)} calls. */
+ private static AtomicBoolean writeCalled = new AtomicBoolean();
+
+ /** Tracks {@link TestBinarylizableRunnable::readBinary(BinaryReader reader)} calls. */
+ private static AtomicBoolean readCalled = new AtomicBoolean();
+
+ /** Tracks {@link TestBinarylizableRunnable::run()} calls. */
+ private static AtomicBoolean executed = new AtomicBoolean();
+
+ /** {@inheritDoc} */
+ @Override public void run() {
+ executed.set(true);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeBinary(BinaryWriter writer) throws BinaryObjectException {
+ writeCalled.set(true);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void readBinary(BinaryReader reader) throws BinaryObjectException {
+ readCalled.set(true);
+ }
+ }
+
+ /**
+ * Test Binarylizable Runnable with ComputeJobMasterLeaveAware interface.
+ */
+ private static class TestBinarylizableMasterLeaveAwareRunnable extends TestBinarylizableRunnable
+ implements ComputeJobMasterLeaveAware {
+
+ /** Tracks {@link TestBinarylizableMasterLeaveAwareRunnable::writeBinary(BinaryWriter writer)} calls. */
+ private static AtomicBoolean writeCalled = new AtomicBoolean();
+
+ /** Tracks {@link TestBinarylizableMasterLeaveAwareRunnable::readBinary(BinaryReader reader)} calls. */
+ private static AtomicBoolean readCalled = new AtomicBoolean();
+
+ /** {@inheritDoc} */
+ @Override public void writeBinary(BinaryWriter writer) throws BinaryObjectException {
+ super.writeBinary(writer);
+ writeCalled.set(true);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void readBinary(BinaryReader reader) throws BinaryObjectException {
+ super.readBinary(reader);
+ readCalled.set(true);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onMasterNodeLeft(ComputeTaskSession ses) throws IgniteException {
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/215e8a1e/modules/core/src/test/java/org/apache/ignite/internal/binary/BinaryMarshallerSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/binary/BinaryMarshallerSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/binary/BinaryMarshallerSelfTest.java
index 37b908a..eefe66c 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/binary/BinaryMarshallerSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/binary/BinaryMarshallerSelfTest.java
@@ -2690,6 +2690,35 @@ public class BinaryMarshallerSelfTest extends GridCommonAbstractTest {
}
/**
+ * Test object with {@link Proxy} field.
+ *
+ * @throws Exception If fails.
+ */
+ public void testObjectContainingProxy() throws Exception {
+ BinaryMarshaller marsh = binaryMarshaller();
+
+ SomeItf inItf = (SomeItf)Proxy.newProxyInstance(
+ BinaryMarshallerSelfTest.class.getClassLoader(), new Class[] {SomeItf.class},
+ new InvocationHandler() {
+ private NonSerializable obj = new NonSerializable(null);
+
+ @Override public Object invoke(Object proxy, Method mtd, Object[] args) throws Throwable {
+ if ("hashCode".equals(mtd.getName()))
+ return obj.hashCode();
+
+ obj.checkAfterUnmarshalled();
+
+ return 17;
+ }
+ }
+ );
+
+ SomeItf outItf = marsh.unmarshal(marsh.marshal(inItf), null);
+
+ assertEquals(outItf.checkAfterUnmarshalled(), 17);
+ }
+
+ /**
* Test duplicate fields.
*
* @throws Exception If failed.
@@ -4676,4 +4705,25 @@ public class BinaryMarshallerSelfTest extends GridCommonAbstractTest {
rawValArr = rawReader.readDecimalArray();
}
}
+
+ /**
+ * Wrapper object.
+ */
+ private static class Wrapper {
+
+ /** Value. */
+ private final Object value;
+
+ /** Constructor. */
+ public Wrapper(Object value) {
+ this.value = value;
+ }
+
+ /**
+ * @return Value.
+ */
+ public Object getValue() {
+ return value;
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/215e8a1e/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBinaryObjectsComputeGridTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBinaryObjectsComputeGridTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBinaryObjectsComputeGridTestSuite.java
index 35be98d..e659966 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBinaryObjectsComputeGridTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBinaryObjectsComputeGridTestSuite.java
@@ -18,6 +18,7 @@
package org.apache.ignite.testsuites;
import junit.framework.TestSuite;
+import org.apache.ignite.internal.GridComputationBinarylizableClosuresSelfTest;
import org.apache.ignite.internal.binary.BinaryMarshaller;
import org.apache.ignite.testframework.config.GridTestProperties;
@@ -32,6 +33,10 @@ public class IgniteBinaryObjectsComputeGridTestSuite {
public static TestSuite suite() throws Exception {
GridTestProperties.setProperty(GridTestProperties.MARSH_CLASS_NAME, BinaryMarshaller.class.getName());
- return IgniteComputeGridTestSuite.suite();
+ TestSuite suite = IgniteComputeGridTestSuite.suite();
+
+ suite.addTestSuite(GridComputationBinarylizableClosuresSelfTest.class);
+
+ return suite;
}
}