You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2016/09/07 11:17:25 UTC

[40/50] [abbrv] ignite git commit: WIP.

WIP.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/51d5f03b
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/51d5f03b
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/51d5f03b

Branch: refs/heads/ignite-2649
Commit: 51d5f03bcc5c9bfb5d2d06d66e135e2342ea53b6
Parents: a2c73d0
Author: vozerov-gridgain <vo...@gridgain.com>
Authored: Wed Sep 7 13:42:36 2016 +0300
Committer: vozerov-gridgain <vo...@gridgain.com>
Committed: Wed Sep 7 13:42:36 2016 +0300

----------------------------------------------------------------------
 .../ignite/spi/discovery/tcp/ClientImpl.java    | 14 ++++----
 .../ignite/spi/discovery/tcp/ServerImpl.java    | 38 ++++++++++----------
 .../spi/discovery/tcp/TcpDiscoveryImpl.java     | 38 --------------------
 .../spi/discovery/tcp/TcpDiscoverySpi.java      | 17 ++++++---
 .../TcpDiscoveryCustomEventMessage.java         | 17 ++-------
 .../spi/discovery/tcp/TcpDiscoverySelfTest.java |  2 +-
 6 files changed, 43 insertions(+), 83 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/51d5f03b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
index 78a1911..a359a9c 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
@@ -430,7 +430,8 @@ class ClientImpl extends TcpDiscoveryImpl {
             throw new IgniteClientDisconnectedException(null, "Failed to send custom message: client is disconnected.");
 
         try {
-            sockWriter.sendMessage(new TcpDiscoveryCustomEventMessage(getLocalNodeId(), evt, marshal(evt)));
+            sockWriter.sendMessage(new TcpDiscoveryCustomEventMessage(getLocalNodeId(), evt,
+                spi.marshaller().marshal(evt)));
         }
         catch (IgniteCheckedException e) {
             throw new IgniteSpiException("Failed to marshal custom event: " + evt, e);
@@ -685,7 +686,7 @@ class ClientImpl extends TcpDiscoveryImpl {
 
             attrs.put(
                 IgniteNodeAttributes.ATTR_SECURITY_CREDENTIALS,
-                marshal(attrs.get(IgniteNodeAttributes.ATTR_SECURITY_CREDENTIALS))
+                spi.marshaller().marshal(attrs.get(IgniteNodeAttributes.ATTR_SECURITY_CREDENTIALS))
             );
 
             node.setAttributes(attrs);
@@ -887,7 +888,7 @@ class ClientImpl extends TcpDiscoveryImpl {
                         TcpDiscoveryAbstractMessage msg;
 
                         try {
-                            msg = unmarshal(in);
+                            msg = spi.marshaller().unmarshal(in, U.resolveClassLoader(spi.ignite().configuration()));
                         }
                         catch (IgniteCheckedException e) {
                             if (log.isDebugEnabled())
@@ -1212,7 +1213,8 @@ class ClientImpl extends TcpDiscoveryImpl {
                         List<TcpDiscoveryAbstractMessage> msgs = null;
 
                         while (!isInterrupted()) {
-                            TcpDiscoveryAbstractMessage msg = unmarshal(in);
+                            TcpDiscoveryAbstractMessage msg =
+                                spi.marshaller().unmarshal(in, U.resolveClassLoader(spi.ignite().configuration()));
 
                             if (msg instanceof TcpDiscoveryClientReconnectMessage) {
                                 TcpDiscoveryClientReconnectMessage res = (TcpDiscoveryClientReconnectMessage)msg;
@@ -1965,8 +1967,8 @@ class ClientImpl extends TcpDiscoveryImpl {
 
                     if (node != null && node.visible()) {
                         try {
-                            DiscoverySpiCustomMessage msgObj = msg.message(spi.marsh,
-                                U.resolveClassLoader(spi.ignite().configuration()), spi.ignite().name());
+                            DiscoverySpiCustomMessage msgObj = msg.message(spi.marshaller(),
+                                U.resolveClassLoader(spi.ignite().configuration()));
 
                             notifyDiscovery(EVT_DISCOVERY_CUSTOM_EVT, topVer, node, allVisibleNodes(), msgObj);
                         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/51d5f03b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
index 2d58842..35cd5d1 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
@@ -91,7 +91,6 @@ import org.apache.ignite.lang.IgniteBiTuple;
 import org.apache.ignite.lang.IgniteInClosure;
 import org.apache.ignite.lang.IgniteProductVersion;
 import org.apache.ignite.lang.IgniteUuid;
-import org.apache.ignite.marshaller.MarshallerUtils;
 import org.apache.ignite.plugin.security.SecurityCredentials;
 import org.apache.ignite.plugin.security.SecurityPermissionSet;
 import org.apache.ignite.spi.IgniteNodeValidationResult;
@@ -742,7 +741,8 @@ class ServerImpl extends TcpDiscoveryImpl {
     /** {@inheritDoc} */
     @Override public void sendCustomEvent(DiscoverySpiCustomMessage evt) {
         try {
-            msgWorker.addMessage(new TcpDiscoveryCustomEventMessage(getLocalNodeId(), evt, marshal(evt)));
+            msgWorker.addMessage(new TcpDiscoveryCustomEventMessage(getLocalNodeId(), evt,
+                spi.marshaller().marshal(evt)));
         }
         catch (IgniteCheckedException e) {
             throw new IgniteSpiException("Failed to marshal custom event: " + evt, e);
@@ -825,7 +825,7 @@ class ServerImpl extends TcpDiscoveryImpl {
 
                         Map<String, Object> attrs = new HashMap<>(locNode.attributes());
 
-                        attrs.put(IgniteNodeAttributes.ATTR_SECURITY_SUBJECT, marshal(subj));
+                        attrs.put(IgniteNodeAttributes.ATTR_SECURITY_SUBJECT, spi.marshaller().marshal(subj));
                         attrs.remove(IgniteNodeAttributes.ATTR_SECURITY_CREDENTIALS);
 
                         locNode.setAttributes(attrs);
@@ -1242,7 +1242,7 @@ class ServerImpl extends TcpDiscoveryImpl {
 
             attrs.put(
                 IgniteNodeAttributes.ATTR_SECURITY_CREDENTIALS,
-                marshal(attrs.get(IgniteNodeAttributes.ATTR_SECURITY_CREDENTIALS))
+                spi.marshaller().marshal(attrs.get(IgniteNodeAttributes.ATTR_SECURITY_CREDENTIALS))
             );
 
             node.setAttributes(attrs);
@@ -1266,7 +1266,7 @@ class ServerImpl extends TcpDiscoveryImpl {
             if (credBytes == null)
                 return null;
 
-            return MarshallerUtils.unmarshal(spi.ignite().name(), spi.marsh, credBytes, null);
+            return spi.marshaller().unmarshal(credBytes, null);
         }
         catch (IgniteCheckedException e) {
             throw new IgniteSpiException("Failed to unmarshal node security credentials: " + node.id(), e);
@@ -2360,7 +2360,7 @@ class ServerImpl extends TcpDiscoveryImpl {
                 for (ClientMessageWorker clientMsgWorker : clientMsgWorkers.values()) {
                     if (msgBytes == null) {
                         try {
-                            msgBytes = marshal(msg);
+                            msgBytes = spi.marshaller().marshal(msg);
                         }
                         catch (IgniteCheckedException e) {
                             U.error(log, "Failed to marshal message: " + msg, e);
@@ -2379,7 +2379,8 @@ class ServerImpl extends TcpDiscoveryImpl {
 
                         if (clientMsgWorker.clientNodeId.equals(node.id())) {
                             try {
-                                msg0 = unmarshal(msgBytes);
+                                msg0 = spi.marshaller().unmarshal(msgBytes,
+                                    U.resolveClassLoader(spi.ignite().configuration()));
 
                                 prepareNodeAddedMessage(msg0, clientMsgWorker.clientNodeId, null, null, null);
 
@@ -3137,7 +3138,7 @@ class ServerImpl extends TcpDiscoveryImpl {
                             // Stick in authentication subject to node (use security-safe attributes for copy).
                             Map<String, Object> attrs = new HashMap<>(node.getAttributes());
 
-                            attrs.put(IgniteNodeAttributes.ATTR_SECURITY_SUBJECT, marshal(subj));
+                            attrs.put(IgniteNodeAttributes.ATTR_SECURITY_SUBJECT, spi.marshaller().marshal(subj));
 
                             node.setAttributes(attrs);
                         }
@@ -3788,8 +3789,9 @@ class ServerImpl extends TcpDiscoveryImpl {
                         else {
                             SecurityContext subj = spi.nodeAuth.authenticateNode(node, cred);
 
-                            SecurityContext coordSubj =
-                                unmarshal(node.<byte[]>attribute(IgniteNodeAttributes.ATTR_SECURITY_SUBJECT));
+                            SecurityContext coordSubj = spi.marshaller().unmarshal(
+                                node.<byte[]>attribute(IgniteNodeAttributes.ATTR_SECURITY_SUBJECT),
+                                U.resolveClassLoader(spi.ignite().configuration()));
 
                             if (!permissionsEqual(coordSubj.subject().permissions(), subj.subject().permissions())) {
                                 // Node has not pass authentication.
@@ -4840,8 +4842,7 @@ class ServerImpl extends TcpDiscoveryImpl {
                     DiscoverySpiCustomMessage msgObj = null;
 
                     try {
-                        msgObj = msg.message(spi.marsh, U.resolveClassLoader(spi.ignite().configuration()),
-                                spi.ignite().name());
+                        msgObj = msg.message(spi.marshaller(), U.resolveClassLoader(spi.ignite().configuration()));
                     }
                     catch (Throwable e) {
                         U.error(log, "Failed to unmarshal discovery custom message.", e);
@@ -4853,7 +4854,8 @@ class ServerImpl extends TcpDiscoveryImpl {
                         if (nextMsg != null) {
                             try {
                                 TcpDiscoveryCustomEventMessage ackMsg =
-                                    new TcpDiscoveryCustomEventMessage(getLocalNodeId(), nextMsg, marshal(nextMsg));
+                                    new TcpDiscoveryCustomEventMessage(getLocalNodeId(), nextMsg,
+                                        spi.marshaller().marshal(nextMsg));
 
                                 ackMsg.topologyVersion(msg.topologyVersion());
 
@@ -4986,8 +4988,7 @@ class ServerImpl extends TcpDiscoveryImpl {
                     try {
                         final IgniteConfiguration cfg = spi.ignite().configuration();
 
-                        DiscoverySpiCustomMessage msgObj = msg.message(spi.marsh,
-                            U.resolveClassLoader(cfg), cfg.getGridName());
+                        DiscoverySpiCustomMessage msgObj = msg.message(spi.marshaller(), U.resolveClassLoader(cfg));
 
                         lsnr.onDiscovery(DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT,
                             msg.topologyVersion(),
@@ -4997,7 +4998,7 @@ class ServerImpl extends TcpDiscoveryImpl {
                             msgObj);
 
                         if (msgObj.isMutable())
-                            msg.message(msgObj, marshal(msgObj));
+                            msg.message(msgObj, spi.marshaller().marshal(msgObj));
                     }
                     catch (Throwable e) {
                         U.error(log, "Failed to unmarshal discovery custom message.", e);
@@ -5433,7 +5434,8 @@ class ServerImpl extends TcpDiscoveryImpl {
 
                 while (!isInterrupted()) {
                     try {
-                        TcpDiscoveryAbstractMessage msg = unmarshal(in);
+                        TcpDiscoveryAbstractMessage msg =
+                            spi.marshaller().unmarshal(in, U.resolveClassLoader(spi.ignite().configuration()));
 
                         msg.senderNodeId(nodeId);
 
@@ -5923,7 +5925,7 @@ class ServerImpl extends TcpDiscoveryImpl {
                 byte[] msgBytes = msgT.get2();
 
                 if (msgBytes == null)
-                    msgBytes = marshal(msg);
+                    msgBytes = spi.marshaller().marshal(msg);
 
                 if (msg instanceof TcpDiscoveryClientAckResponse) {
                     if (clientVer == null) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/51d5f03b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java
index 8abcb18..341e536 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java
@@ -17,7 +17,6 @@
 
 package org.apache.ignite.spi.discovery.tcp;
 
-import java.io.InputStream;
 import java.net.InetSocketAddress;
 import java.text.SimpleDateFormat;
 import java.util.ArrayList;
@@ -28,14 +27,12 @@ import java.util.List;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentLinkedDeque;
 
-import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.internal.IgniteInterruptedCheckedException;
 import org.apache.ignite.internal.util.typedef.internal.LT;
 import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.marshaller.MarshallerUtils;
 import org.apache.ignite.spi.IgniteSpiContext;
 import org.apache.ignite.spi.IgniteSpiException;
 import org.apache.ignite.spi.IgniteSpiThread;
@@ -330,39 +327,4 @@ abstract class TcpDiscoveryImpl {
 
         return res;
     }
-
-    /**
-     * Marshal object.
-     *
-     * @param obj Object.
-     * @return Result.
-     * @throws IgniteCheckedException If failed.
-     */
-    protected byte[] marshal(Object obj) throws IgniteCheckedException {
-        return MarshallerUtils.withNodeName(spi.marsh, spi.ignite().name()).marshal(obj);
-    }
-
-    /**
-     * Unmarshal object.
-     *
-     * @param data Data.
-     * @return Result.
-     * @throws IgniteCheckedException If failed.
-     */
-    protected <T> T unmarshal(byte[] data) throws IgniteCheckedException {
-        return MarshallerUtils.withNodeName(spi.marsh, spi.ignite().name())
-            .unmarshal(data, U.resolveClassLoader(spi.ignite().configuration()));
-    }
-
-    /**
-     * Unmarshal object.
-     *
-     * @param in Input stream.
-     * @return Result.
-     * @throws IgniteCheckedException If failed.
-     */
-    protected <T> T unmarshal(InputStream in) throws IgniteCheckedException {
-        return MarshallerUtils.withNodeName(spi.marsh, spi.ignite().name())
-            .unmarshal(in, U.resolveClassLoader(spi.ignite().configuration()));
-    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/51d5f03b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
index 1de0504..1b37412 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
@@ -343,7 +343,7 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T
     protected volatile long gridStartTime;
 
     /** Marshaller. */
-    protected final Marshaller marsh = new JdkMarshaller();
+    private final Marshaller marsh = new JdkMarshaller();
 
     /** Statistics. */
     protected final TcpDiscoveryStatistics stats = new TcpDiscoveryStatistics();
@@ -1378,7 +1378,7 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T
         IgniteCheckedException err = null;
 
         try {
-            MarshallerUtils.withNodeName(marsh, ignite.name()).marshal(msg, out);
+            MarshallerUtils.withNodeName(marshaller(), ignite.name()).marshal(msg, out);
         }
         catch (IgniteCheckedException e) {
             err = e;
@@ -1462,7 +1462,7 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T
         try {
             sock.setSoTimeout((int)timeout);
 
-            T res = MarshallerUtils.withNodeName(marsh, ignite.name()).unmarshal(in == null ?
+            T res = MarshallerUtils.withNodeName(marshaller(), ignite.name()).unmarshal(in == null ?
                 sock.getInputStream() : in, U.resolveClassLoader(ignite.configuration()));
 
             return res;
@@ -1680,7 +1680,7 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T
 
         for (Map.Entry<Integer, Serializable> entry : data.entrySet()) {
             try {
-                byte[] bytes = marsh.marshal(entry.getValue());
+                byte[] bytes = marshaller().marshal(entry.getValue());
 
                 data0.put(entry.getKey(), bytes);
             }
@@ -1711,7 +1711,7 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T
         for (Map.Entry<Integer, byte[]> entry : data.entrySet()) {
             try {
                 Serializable compData =
-                    MarshallerUtils.withNodeName(marsh, ignite.name()).unmarshal(entry.getValue(), clsLdr);
+                    MarshallerUtils.withNodeName(marshaller(), ignite.name()).unmarshal(entry.getValue(), clsLdr);
 
                 data0.put(entry.getKey(), compData);
             }
@@ -1988,6 +1988,13 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T
         impl.brakeConnection();
     }
 
+    /**
+     * @return Marshaller.
+     */
+    protected Marshaller marshaller() {
+        return MarshallerUtils.withNodeName(marsh, gridName);
+    }
+
     /** {@inheritDoc} */
     @Override public String toString() {
         return S.toString(TcpDiscoverySpi.class, this);

http://git-wip-us.apache.org/repos/asf/ignite/blob/51d5f03b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryCustomEventMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryCustomEventMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryCustomEventMessage.java
index 897a4ed..f627104 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryCustomEventMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryCustomEventMessage.java
@@ -22,7 +22,6 @@ import java.util.UUID;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.marshaller.Marshaller;
-import org.apache.ignite.marshaller.MarshallerUtils;
 import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage;
 import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.Nullable;
@@ -73,25 +72,13 @@ public class TcpDiscoveryCustomEventMessage extends TcpDiscoveryAbstractMessage
 
     /**
      * @param marsh Marshaller.
-     * @param gridName Grid name.
-     * @return Deserialized message.
-     * @throws java.lang.Throwable if unmarshal failed.
-     */
-    @Nullable public DiscoverySpiCustomMessage message(@NotNull Marshaller marsh, String gridName) throws Throwable {
-        return message(marsh, null, gridName);
-    }
-
-    /**
-     * @param marsh Marshaller.
      * @param ldr Class loader.
-     * @param gridName Grid name.
      * @return Deserialized message.
      * @throws java.lang.Throwable if unmarshal failed.
      */
-    @Nullable public DiscoverySpiCustomMessage message(@NotNull Marshaller marsh, ClassLoader ldr,
-        final String gridName) throws Throwable {
+    @Nullable public DiscoverySpiCustomMessage message(@NotNull Marshaller marsh, ClassLoader ldr) throws Throwable {
         if (msg == null) {
-            msg = MarshallerUtils.unmarshal(gridName, marsh, msgBytes, ldr);
+            msg = marsh.unmarshal(msgBytes, ldr);
 
             assert msg != null;
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/51d5f03b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java
index 7014608..b688b1f 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java
@@ -1964,7 +1964,7 @@ public class TcpDiscoverySelfTest extends GridCommonAbstractTest {
                 if (msg instanceof TcpDiscoveryCustomEventMessage) {
                     try {
                         DiscoveryCustomMessage custMsg = GridTestUtils.getFieldValue(
-                            ((TcpDiscoveryCustomEventMessage)msg).message(marsh, null), "delegate");
+                            ((TcpDiscoveryCustomEventMessage)msg).message(marshaller(), null), "delegate");
 
                         if (custMsg instanceof StartRoutineAckDiscoveryMessage) {
                             log.info("Skip message send and stop node: " + msg);