You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2016/09/07 11:17:17 UTC

[32/50] [abbrv] ignite git commit: WIP

WIP


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/66917b3f
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/66917b3f
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/66917b3f

Branch: refs/heads/ignite-2649
Commit: 66917b3f2ae6252c0e65d7c0957491472a9e434b
Parents: 6e686a2
Author: vozerov-gridgain <vo...@gridgain.com>
Authored: Wed Sep 7 12:25:56 2016 +0300
Committer: vozerov-gridgain <vo...@gridgain.com>
Committed: Wed Sep 7 12:25:56 2016 +0300

----------------------------------------------------------------------
 .../processors/closure/GridClosureProcessor.java   | 11 ++++-------
 .../datastreamer/DataStreamProcessor.java          | 11 ++++-------
 .../processors/datastreamer/DataStreamerImpl.java  |  8 ++++----
 .../processors/service/GridServiceProcessor.java   | 15 ++++++++-------
 .../processors/task/GridTaskProcessor.java         | 10 ++++------
 .../internal/processors/task/GridTaskWorker.java   | 17 ++++++++---------
 6 files changed, 32 insertions(+), 40 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/66917b3f/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java
index fb5c2c8..1a96954 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java
@@ -70,7 +70,6 @@ import org.apache.ignite.lang.IgniteClosure;
 import org.apache.ignite.lang.IgniteProductVersion;
 import org.apache.ignite.lang.IgniteReducer;
 import org.apache.ignite.marshaller.Marshaller;
-import org.apache.ignite.marshaller.MarshallerUtils;
 import org.apache.ignite.resources.LoadBalancerResource;
 import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.Nullable;
@@ -1145,18 +1144,16 @@ public class GridClosureProcessor extends GridProcessorAdapter {
                         if (closureBytes == null) {
                             closure = c.job;
 
-                            closureBytes = MarshallerUtils.marshal(ctx, c.job);
+                            closureBytes = marsh.marshal(c.job);
                         }
 
                         if (c.job == closure)
-                            c.job = MarshallerUtils.unmarshal(ctx.gridName(), marsh, closureBytes, U.resolveClassLoader(ctx.config()));
+                            c.job = marsh.unmarshal(closureBytes, U.resolveClassLoader(ctx.config()));
                         else
-                            c.job = MarshallerUtils.marshalUnmarshal(ctx.gridName(), marsh, c.job,
-                                U.resolveClassLoader(ctx.config()));
+                            c.job = marsh.unmarshal(marsh.marshal(c.job), U.resolveClassLoader(ctx.config()));
                     }
                     else
-                        job = MarshallerUtils.marshalUnmarshal(ctx.gridName(), marsh, job,
-                            U.resolveClassLoader(ctx.config()));
+                        job = marsh.unmarshal(marsh.marshal(job), U.resolveClassLoader(ctx.config()));
                 }
                 else
                     hadLocNode = true;

http://git-wip-us.apache.org/repos/asf/ignite/blob/66917b3f/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java
index 853d1a0..c7c1f5e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java
@@ -37,7 +37,6 @@ import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.internal.util.worker.GridWorker;
 import org.apache.ignite.lang.IgniteFuture;
 import org.apache.ignite.marshaller.Marshaller;
-import org.apache.ignite.marshaller.MarshallerUtils;
 import org.apache.ignite.stream.StreamReceiver;
 import org.apache.ignite.thread.IgniteThread;
 import org.jetbrains.annotations.Nullable;
@@ -91,7 +90,7 @@ public class DataStreamProcessor<K, V> extends GridProcessorAdapter {
         if (ctx.config().isDaemon())
             return;
 
-        marshErrBytes = MarshallerUtils.marshal(ctx, new IgniteCheckedException("Failed to marshal response error, " +
+        marshErrBytes = marsh.marshal(new IgniteCheckedException("Failed to marshal response error, " +
             "see node log for details."));
 
         flusher = new IgniteThread(new GridWorker(ctx.gridName(), "grid-data-loader-flusher", log) {
@@ -236,8 +235,7 @@ public class DataStreamProcessor<K, V> extends GridProcessorAdapter {
             Object topic;
 
             try {
-                topic = MarshallerUtils.unmarshal(ctx.gridName(), marsh, req.responseTopicBytes(),
-                    U.resolveClassLoader(null, ctx.config()));
+                topic = marsh.unmarshal(req.responseTopicBytes(), U.resolveClassLoader(null, ctx.config()));
             }
             catch (IgniteCheckedException e) {
                 U.error(log, "Failed to unmarshal topic from request: " + req, e);
@@ -277,8 +275,7 @@ public class DataStreamProcessor<K, V> extends GridProcessorAdapter {
             StreamReceiver<K, V> updater;
 
             try {
-                updater = MarshallerUtils.unmarshal(ctx.gridName(), marsh, req.updaterBytes(),
-                    U.resolveClassLoader(clsLdr, ctx.config()));
+                updater = marsh.unmarshal(req.updaterBytes(), U.resolveClassLoader(clsLdr, ctx.config()));
 
                 if (updater != null)
                     ctx.resource().injectGeneric(updater);
@@ -332,7 +329,7 @@ public class DataStreamProcessor<K, V> extends GridProcessorAdapter {
         byte[] errBytes;
 
         try {
-            errBytes = err != null ? MarshallerUtils.marshal(ctx, err) : null;
+            errBytes = err != null ? marsh.marshal(err) : null;
         }
         catch (Exception e) {
             U.error(log, "Failed to marshal error [err=" + err + ", marshErr=" + e + ']', e);

http://git-wip-us.apache.org/repos/asf/ignite/blob/66917b3f/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
index 9d04a5f..21df559 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
@@ -98,7 +98,6 @@ import org.apache.ignite.lang.IgniteClosure;
 import org.apache.ignite.lang.IgniteFuture;
 import org.apache.ignite.lang.IgniteInClosure;
 import org.apache.ignite.lang.IgniteUuid;
-import org.apache.ignite.marshaller.MarshallerUtils;
 import org.apache.ignite.plugin.security.SecurityPermission;
 import org.apache.ignite.stream.StreamReceiver;
 import org.jetbrains.annotations.Nullable;
@@ -1360,11 +1359,11 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
                     if (updaterBytes == null) {
                         assert rcvr != null;
 
-                        updaterBytes = MarshallerUtils.marshal(ctx, rcvr);
+                        updaterBytes = ctx.config().getMarshaller().marshal(rcvr);
                     }
 
                     if (topicBytes == null)
-                        topicBytes = MarshallerUtils.marshal(ctx, topic);
+                        topicBytes = ctx.config().getMarshaller().marshal(topic);
                 }
                 catch (IgniteCheckedException e) {
                     U.error(log, "Failed to marshal (request will not be sent).", e);
@@ -1495,7 +1494,8 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
                 try {
                     GridPeerDeployAware jobPda0 = jobPda;
 
-                    err = MarshallerUtils.unmarshal(ctx.gridName(), ctx.config().getMarshaller(), errBytes,
+                    err = ctx.config().getMarshaller().unmarshal(
+                        errBytes,
                         U.resolveClassLoader(jobPda0 != null ? jobPda0.classLoader() : null, ctx.config()));
                 }
                 catch (IgniteCheckedException e) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/66917b3f/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
index 654114d..b418ba2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
@@ -83,7 +83,6 @@ import org.apache.ignite.lang.IgniteProductVersion;
 import org.apache.ignite.lang.IgniteUuid;
 import org.apache.ignite.marshaller.Marshaller;
 import org.apache.ignite.resources.IgniteInstanceResource;
-import org.apache.ignite.marshaller.MarshallerUtils;
 import org.apache.ignite.resources.JobContextResource;
 import org.apache.ignite.resources.LoggerResource;
 import org.apache.ignite.services.Service;
@@ -448,16 +447,18 @@ public class GridServiceProcessor extends GridProcessorAdapter {
         validate(cfg);
 
         if (!state.srvcCompatibility) {
+            Marshaller marsh = ctx.config().getMarshaller();
+
             LazyServiceConfiguration cfg0;
 
             try {
-                byte[] srvcBytes = MarshallerUtils.marshal(ctx, cfg.getService());
+                byte[] srvcBytes = marsh.marshal(cfg.getService());
 
                 cfg0 = new LazyServiceConfiguration(cfg, srvcBytes);
             }
             catch (IgniteCheckedException e) {
                 U.error(log, "Failed to marshal service with configured marshaller [srvc=" + cfg.getService()
-                    + ", marsh=" + ctx.config().getMarshaller() + "]", e);
+                    + ", marsh=" + marsh + "]", e);
 
                 return new GridFinishedFuture<>(e);
             }
@@ -1122,7 +1123,7 @@ public class GridServiceProcessor extends GridProcessorAdapter {
         if (cfg instanceof LazyServiceConfiguration) {
             byte[] bytes = ((LazyServiceConfiguration)cfg).serviceBytes();
 
-            Service srvc = MarshallerUtils.unmarshal(ctx.gridName(), m, bytes, U.resolveClassLoader(null, ctx.config()));
+            Service srvc = m.unmarshal(bytes, U.resolveClassLoader(null, ctx.config()));
 
             ctx.resource().inject(srvc);
 
@@ -1132,10 +1133,10 @@ public class GridServiceProcessor extends GridProcessorAdapter {
             Service svc = cfg.getService();
 
             try {
-                byte[] bytes = MarshallerUtils.marshal(ctx, svc);
+                byte[] bytes = m.marshal(svc);
 
-                Service cp = MarshallerUtils.unmarshal(ctx.gridName(), m, bytes, U.resolveClassLoader(svc.getClass().getClassLoader(),
-                    ctx.config()));
+                Service cp = m.unmarshal(bytes,
+                    U.resolveClassLoader(svc.getClass().getClassLoader(), ctx.config()));
 
                 ctx.resource().inject(cp);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/66917b3f/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java
index 9633a65..0c3cf0e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java
@@ -71,7 +71,6 @@ import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteFuture;
 import org.apache.ignite.lang.IgniteUuid;
 import org.apache.ignite.marshaller.Marshaller;
-import org.apache.ignite.marshaller.MarshallerUtils;
 import org.apache.ignite.plugin.security.SecurityPermission;
 import org.jetbrains.annotations.Nullable;
 import org.jsr166.LongAdder8;
@@ -914,7 +913,7 @@ public class GridTaskProcessor extends GridProcessorAdapter {
                     GridTaskSessionRequest req = new GridTaskSessionRequest(
                         ses.getId(),
                         null,
-                        loc ? null : MarshallerUtils.marshal(ctx, attrs),
+                        loc ? null : marsh.marshal(attrs),
                         attrs);
 
                     // Make sure to go through IO manager always, since order
@@ -1030,7 +1029,7 @@ public class GridTaskProcessor extends GridProcessorAdapter {
             boolean loc = ctx.localNodeId().equals(nodeId) && !ctx.config().isMarshalLocalJobs();
 
             Map<?, ?> attrs = loc ? msg.getAttributes() :
-                MarshallerUtils.<Map<?, ?>>unmarshal(ctx.gridName(), marsh, msg.getAttributesBytes(),
+                marsh.<Map<?, ?>>unmarshal(msg.getAttributesBytes(),
                     U.resolveClassLoader(task.getTask().getClass().getClassLoader(), ctx.config()));
 
             GridTaskSessionImpl ses = task.getSession();
@@ -1306,8 +1305,7 @@ public class GridTaskProcessor extends GridProcessorAdapter {
                     if (topic == null) {
                         assert req.topicBytes() != null;
 
-                        topic = MarshallerUtils.unmarshal(ctx.gridName(), marsh, req.topicBytes(),
-                            U.resolveClassLoader(ctx.config()));
+                        topic = marsh.unmarshal(req.topicBytes(), U.resolveClassLoader(ctx.config()));
                     }
 
                     boolean loc = ctx.localNodeId().equals(nodeId);
@@ -1315,7 +1313,7 @@ public class GridTaskProcessor extends GridProcessorAdapter {
                     ctx.io().send(nodeId, topic,
                         new GridJobSiblingsResponse(
                             loc ? siblings : null,
-                            loc ? null : MarshallerUtils.marshal(ctx, siblings)),
+                            loc ? null : marsh.marshal(siblings)),
                         SYSTEM_POOL);
                 }
                 catch (IgniteCheckedException e) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/66917b3f/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java
index 10942da..8ce005a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java
@@ -81,7 +81,6 @@ import org.apache.ignite.internal.visor.util.VisorClusterGroupEmptyException;
 import org.apache.ignite.lang.IgniteInClosure;
 import org.apache.ignite.lang.IgniteUuid;
 import org.apache.ignite.marshaller.Marshaller;
-import org.apache.ignite.marshaller.MarshallerUtils;
 import org.apache.ignite.resources.TaskContinuousMapperResource;
 import org.jetbrains.annotations.Nullable;
 import org.jsr166.ConcurrentLinkedDeque8;
@@ -775,15 +774,15 @@ class GridTaskWorker<T, R> extends GridWorker implements GridTimeoutObject {
                     try {
                         boolean loc = ctx.localNodeId().equals(res.getNodeId()) && !ctx.config().isMarshalLocalJobs();
 
-                        Object res0 = loc ? res.getJobResult() : MarshallerUtils.unmarshal(ctx.gridName(), marsh,
-                            res.getJobResultBytes(), U.resolveClassLoader(clsLdr, ctx.config()));
+                        Object res0 = loc ? res.getJobResult() : marsh.unmarshal(res.getJobResultBytes(),
+                            U.resolveClassLoader(clsLdr, ctx.config()));
 
                         IgniteException ex = loc ? res.getException() :
-                            MarshallerUtils.<IgniteException>unmarshal(ctx.gridName(), marsh, res.getExceptionBytes(),
+                            marsh.<IgniteException>unmarshal(res.getExceptionBytes(),
                                 U.resolveClassLoader(clsLdr, ctx.config()));
 
                         Map<Object, Object> attrs = loc ? res.getJobAttributes() :
-                            MarshallerUtils.<Map<Object, Object>>unmarshal(ctx.gridName(), marsh, res.getJobAttributesBytes(),
+                            marsh.<Map<Object, Object>>unmarshal(res.getJobAttributesBytes(),
                                 U.resolveClassLoader(clsLdr, ctx.config()));
 
                         jobRes.onResponse(res0, ex, attrs, res.isCancelled());
@@ -1254,16 +1253,16 @@ class GridTaskWorker<T, R> extends GridWorker implements GridTimeoutObject {
                         ses.getTaskName(),
                         ses.getUserVersion(),
                         ses.getTaskClassName(),
-                        loc ? null : MarshallerUtils.marshal(ctx, res.getJob()),
+                        loc ? null : marsh.marshal(res.getJob()),
                         loc ? res.getJob() : null,
                         ses.getStartTime(),
                         timeout,
                         ses.getTopology(),
-                        loc ? null : MarshallerUtils.marshal(ctx, ses.getJobSiblings()),
+                        loc ? null : marsh.marshal(ses.getJobSiblings()),
                         loc ? ses.getJobSiblings() : null,
-                        loc ? null : MarshallerUtils.marshal(ctx, sesAttrs),
+                        loc ? null : marsh.marshal(sesAttrs),
                         loc ? sesAttrs : null,
-                        loc ? null : MarshallerUtils.marshal(ctx, jobAttrs),
+                        loc ? null : marsh.marshal(jobAttrs),
                         loc ? jobAttrs : null,
                         ses.getCheckpointSpi(),
                         dep.classLoaderId(),