You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2016/09/07 11:17:17 UTC
[32/50] [abbrv] ignite git commit: WIP
WIP
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/66917b3f
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/66917b3f
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/66917b3f
Branch: refs/heads/ignite-2649
Commit: 66917b3f2ae6252c0e65d7c0957491472a9e434b
Parents: 6e686a2
Author: vozerov-gridgain <vo...@gridgain.com>
Authored: Wed Sep 7 12:25:56 2016 +0300
Committer: vozerov-gridgain <vo...@gridgain.com>
Committed: Wed Sep 7 12:25:56 2016 +0300
----------------------------------------------------------------------
.../processors/closure/GridClosureProcessor.java | 11 ++++-------
.../datastreamer/DataStreamProcessor.java | 11 ++++-------
.../processors/datastreamer/DataStreamerImpl.java | 8 ++++----
.../processors/service/GridServiceProcessor.java | 15 ++++++++-------
.../processors/task/GridTaskProcessor.java | 10 ++++------
.../internal/processors/task/GridTaskWorker.java | 17 ++++++++---------
6 files changed, 32 insertions(+), 40 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/66917b3f/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java
index fb5c2c8..1a96954 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java
@@ -70,7 +70,6 @@ import org.apache.ignite.lang.IgniteClosure;
import org.apache.ignite.lang.IgniteProductVersion;
import org.apache.ignite.lang.IgniteReducer;
import org.apache.ignite.marshaller.Marshaller;
-import org.apache.ignite.marshaller.MarshallerUtils;
import org.apache.ignite.resources.LoadBalancerResource;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
@@ -1145,18 +1144,16 @@ public class GridClosureProcessor extends GridProcessorAdapter {
if (closureBytes == null) {
closure = c.job;
- closureBytes = MarshallerUtils.marshal(ctx, c.job);
+ closureBytes = marsh.marshal(c.job);
}
if (c.job == closure)
- c.job = MarshallerUtils.unmarshal(ctx.gridName(), marsh, closureBytes, U.resolveClassLoader(ctx.config()));
+ c.job = marsh.unmarshal(closureBytes, U.resolveClassLoader(ctx.config()));
else
- c.job = MarshallerUtils.marshalUnmarshal(ctx.gridName(), marsh, c.job,
- U.resolveClassLoader(ctx.config()));
+ c.job = marsh.unmarshal(marsh.marshal(c.job), U.resolveClassLoader(ctx.config()));
}
else
- job = MarshallerUtils.marshalUnmarshal(ctx.gridName(), marsh, job,
- U.resolveClassLoader(ctx.config()));
+ job = marsh.unmarshal(marsh.marshal(job), U.resolveClassLoader(ctx.config()));
}
else
hadLocNode = true;
http://git-wip-us.apache.org/repos/asf/ignite/blob/66917b3f/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java
index 853d1a0..c7c1f5e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java
@@ -37,7 +37,6 @@ import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.internal.util.worker.GridWorker;
import org.apache.ignite.lang.IgniteFuture;
import org.apache.ignite.marshaller.Marshaller;
-import org.apache.ignite.marshaller.MarshallerUtils;
import org.apache.ignite.stream.StreamReceiver;
import org.apache.ignite.thread.IgniteThread;
import org.jetbrains.annotations.Nullable;
@@ -91,7 +90,7 @@ public class DataStreamProcessor<K, V> extends GridProcessorAdapter {
if (ctx.config().isDaemon())
return;
- marshErrBytes = MarshallerUtils.marshal(ctx, new IgniteCheckedException("Failed to marshal response error, " +
+ marshErrBytes = marsh.marshal(new IgniteCheckedException("Failed to marshal response error, " +
"see node log for details."));
flusher = new IgniteThread(new GridWorker(ctx.gridName(), "grid-data-loader-flusher", log) {
@@ -236,8 +235,7 @@ public class DataStreamProcessor<K, V> extends GridProcessorAdapter {
Object topic;
try {
- topic = MarshallerUtils.unmarshal(ctx.gridName(), marsh, req.responseTopicBytes(),
- U.resolveClassLoader(null, ctx.config()));
+ topic = marsh.unmarshal(req.responseTopicBytes(), U.resolveClassLoader(null, ctx.config()));
}
catch (IgniteCheckedException e) {
U.error(log, "Failed to unmarshal topic from request: " + req, e);
@@ -277,8 +275,7 @@ public class DataStreamProcessor<K, V> extends GridProcessorAdapter {
StreamReceiver<K, V> updater;
try {
- updater = MarshallerUtils.unmarshal(ctx.gridName(), marsh, req.updaterBytes(),
- U.resolveClassLoader(clsLdr, ctx.config()));
+ updater = marsh.unmarshal(req.updaterBytes(), U.resolveClassLoader(clsLdr, ctx.config()));
if (updater != null)
ctx.resource().injectGeneric(updater);
@@ -332,7 +329,7 @@ public class DataStreamProcessor<K, V> extends GridProcessorAdapter {
byte[] errBytes;
try {
- errBytes = err != null ? MarshallerUtils.marshal(ctx, err) : null;
+ errBytes = err != null ? marsh.marshal(err) : null;
}
catch (Exception e) {
U.error(log, "Failed to marshal error [err=" + err + ", marshErr=" + e + ']', e);
http://git-wip-us.apache.org/repos/asf/ignite/blob/66917b3f/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
index 9d04a5f..21df559 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
@@ -98,7 +98,6 @@ import org.apache.ignite.lang.IgniteClosure;
import org.apache.ignite.lang.IgniteFuture;
import org.apache.ignite.lang.IgniteInClosure;
import org.apache.ignite.lang.IgniteUuid;
-import org.apache.ignite.marshaller.MarshallerUtils;
import org.apache.ignite.plugin.security.SecurityPermission;
import org.apache.ignite.stream.StreamReceiver;
import org.jetbrains.annotations.Nullable;
@@ -1360,11 +1359,11 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
if (updaterBytes == null) {
assert rcvr != null;
- updaterBytes = MarshallerUtils.marshal(ctx, rcvr);
+ updaterBytes = ctx.config().getMarshaller().marshal(rcvr);
}
if (topicBytes == null)
- topicBytes = MarshallerUtils.marshal(ctx, topic);
+ topicBytes = ctx.config().getMarshaller().marshal(topic);
}
catch (IgniteCheckedException e) {
U.error(log, "Failed to marshal (request will not be sent).", e);
@@ -1495,7 +1494,8 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
try {
GridPeerDeployAware jobPda0 = jobPda;
- err = MarshallerUtils.unmarshal(ctx.gridName(), ctx.config().getMarshaller(), errBytes,
+ err = ctx.config().getMarshaller().unmarshal(
+ errBytes,
U.resolveClassLoader(jobPda0 != null ? jobPda0.classLoader() : null, ctx.config()));
}
catch (IgniteCheckedException e) {
http://git-wip-us.apache.org/repos/asf/ignite/blob/66917b3f/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
index 654114d..b418ba2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
@@ -83,7 +83,6 @@ import org.apache.ignite.lang.IgniteProductVersion;
import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.marshaller.Marshaller;
import org.apache.ignite.resources.IgniteInstanceResource;
-import org.apache.ignite.marshaller.MarshallerUtils;
import org.apache.ignite.resources.JobContextResource;
import org.apache.ignite.resources.LoggerResource;
import org.apache.ignite.services.Service;
@@ -448,16 +447,18 @@ public class GridServiceProcessor extends GridProcessorAdapter {
validate(cfg);
if (!state.srvcCompatibility) {
+ Marshaller marsh = ctx.config().getMarshaller();
+
LazyServiceConfiguration cfg0;
try {
- byte[] srvcBytes = MarshallerUtils.marshal(ctx, cfg.getService());
+ byte[] srvcBytes = marsh.marshal(cfg.getService());
cfg0 = new LazyServiceConfiguration(cfg, srvcBytes);
}
catch (IgniteCheckedException e) {
U.error(log, "Failed to marshal service with configured marshaller [srvc=" + cfg.getService()
- + ", marsh=" + ctx.config().getMarshaller() + "]", e);
+ + ", marsh=" + marsh + "]", e);
return new GridFinishedFuture<>(e);
}
@@ -1122,7 +1123,7 @@ public class GridServiceProcessor extends GridProcessorAdapter {
if (cfg instanceof LazyServiceConfiguration) {
byte[] bytes = ((LazyServiceConfiguration)cfg).serviceBytes();
- Service srvc = MarshallerUtils.unmarshal(ctx.gridName(), m, bytes, U.resolveClassLoader(null, ctx.config()));
+ Service srvc = m.unmarshal(bytes, U.resolveClassLoader(null, ctx.config()));
ctx.resource().inject(srvc);
@@ -1132,10 +1133,10 @@ public class GridServiceProcessor extends GridProcessorAdapter {
Service svc = cfg.getService();
try {
- byte[] bytes = MarshallerUtils.marshal(ctx, svc);
+ byte[] bytes = m.marshal(svc);
- Service cp = MarshallerUtils.unmarshal(ctx.gridName(), m, bytes, U.resolveClassLoader(svc.getClass().getClassLoader(),
- ctx.config()));
+ Service cp = m.unmarshal(bytes,
+ U.resolveClassLoader(svc.getClass().getClassLoader(), ctx.config()));
ctx.resource().inject(cp);
http://git-wip-us.apache.org/repos/asf/ignite/blob/66917b3f/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java
index 9633a65..0c3cf0e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java
@@ -71,7 +71,6 @@ import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteFuture;
import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.marshaller.Marshaller;
-import org.apache.ignite.marshaller.MarshallerUtils;
import org.apache.ignite.plugin.security.SecurityPermission;
import org.jetbrains.annotations.Nullable;
import org.jsr166.LongAdder8;
@@ -914,7 +913,7 @@ public class GridTaskProcessor extends GridProcessorAdapter {
GridTaskSessionRequest req = new GridTaskSessionRequest(
ses.getId(),
null,
- loc ? null : MarshallerUtils.marshal(ctx, attrs),
+ loc ? null : marsh.marshal(attrs),
attrs);
// Make sure to go through IO manager always, since order
@@ -1030,7 +1029,7 @@ public class GridTaskProcessor extends GridProcessorAdapter {
boolean loc = ctx.localNodeId().equals(nodeId) && !ctx.config().isMarshalLocalJobs();
Map<?, ?> attrs = loc ? msg.getAttributes() :
- MarshallerUtils.<Map<?, ?>>unmarshal(ctx.gridName(), marsh, msg.getAttributesBytes(),
+ marsh.<Map<?, ?>>unmarshal(msg.getAttributesBytes(),
U.resolveClassLoader(task.getTask().getClass().getClassLoader(), ctx.config()));
GridTaskSessionImpl ses = task.getSession();
@@ -1306,8 +1305,7 @@ public class GridTaskProcessor extends GridProcessorAdapter {
if (topic == null) {
assert req.topicBytes() != null;
- topic = MarshallerUtils.unmarshal(ctx.gridName(), marsh, req.topicBytes(),
- U.resolveClassLoader(ctx.config()));
+ topic = marsh.unmarshal(req.topicBytes(), U.resolveClassLoader(ctx.config()));
}
boolean loc = ctx.localNodeId().equals(nodeId);
@@ -1315,7 +1313,7 @@ public class GridTaskProcessor extends GridProcessorAdapter {
ctx.io().send(nodeId, topic,
new GridJobSiblingsResponse(
loc ? siblings : null,
- loc ? null : MarshallerUtils.marshal(ctx, siblings)),
+ loc ? null : marsh.marshal(siblings)),
SYSTEM_POOL);
}
catch (IgniteCheckedException e) {
http://git-wip-us.apache.org/repos/asf/ignite/blob/66917b3f/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java
index 10942da..8ce005a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java
@@ -81,7 +81,6 @@ import org.apache.ignite.internal.visor.util.VisorClusterGroupEmptyException;
import org.apache.ignite.lang.IgniteInClosure;
import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.marshaller.Marshaller;
-import org.apache.ignite.marshaller.MarshallerUtils;
import org.apache.ignite.resources.TaskContinuousMapperResource;
import org.jetbrains.annotations.Nullable;
import org.jsr166.ConcurrentLinkedDeque8;
@@ -775,15 +774,15 @@ class GridTaskWorker<T, R> extends GridWorker implements GridTimeoutObject {
try {
boolean loc = ctx.localNodeId().equals(res.getNodeId()) && !ctx.config().isMarshalLocalJobs();
- Object res0 = loc ? res.getJobResult() : MarshallerUtils.unmarshal(ctx.gridName(), marsh,
- res.getJobResultBytes(), U.resolveClassLoader(clsLdr, ctx.config()));
+ Object res0 = loc ? res.getJobResult() : marsh.unmarshal(res.getJobResultBytes(),
+ U.resolveClassLoader(clsLdr, ctx.config()));
IgniteException ex = loc ? res.getException() :
- MarshallerUtils.<IgniteException>unmarshal(ctx.gridName(), marsh, res.getExceptionBytes(),
+ marsh.<IgniteException>unmarshal(res.getExceptionBytes(),
U.resolveClassLoader(clsLdr, ctx.config()));
Map<Object, Object> attrs = loc ? res.getJobAttributes() :
- MarshallerUtils.<Map<Object, Object>>unmarshal(ctx.gridName(), marsh, res.getJobAttributesBytes(),
+ marsh.<Map<Object, Object>>unmarshal(res.getJobAttributesBytes(),
U.resolveClassLoader(clsLdr, ctx.config()));
jobRes.onResponse(res0, ex, attrs, res.isCancelled());
@@ -1254,16 +1253,16 @@ class GridTaskWorker<T, R> extends GridWorker implements GridTimeoutObject {
ses.getTaskName(),
ses.getUserVersion(),
ses.getTaskClassName(),
- loc ? null : MarshallerUtils.marshal(ctx, res.getJob()),
+ loc ? null : marsh.marshal(res.getJob()),
loc ? res.getJob() : null,
ses.getStartTime(),
timeout,
ses.getTopology(),
- loc ? null : MarshallerUtils.marshal(ctx, ses.getJobSiblings()),
+ loc ? null : marsh.marshal(ses.getJobSiblings()),
loc ? ses.getJobSiblings() : null,
- loc ? null : MarshallerUtils.marshal(ctx, sesAttrs),
+ loc ? null : marsh.marshal(sesAttrs),
loc ? sesAttrs : null,
- loc ? null : MarshallerUtils.marshal(ctx, jobAttrs),
+ loc ? null : marsh.marshal(jobAttrs),
loc ? jobAttrs : null,
ses.getCheckpointSpi(),
dep.classLoaderId(),