You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2016/09/07 11:17:25 UTC
[40/50] [abbrv] ignite git commit: WIP.
WIP.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/51d5f03b
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/51d5f03b
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/51d5f03b
Branch: refs/heads/ignite-2649
Commit: 51d5f03bcc5c9bfb5d2d06d66e135e2342ea53b6
Parents: a2c73d0
Author: vozerov-gridgain <vo...@gridgain.com>
Authored: Wed Sep 7 13:42:36 2016 +0300
Committer: vozerov-gridgain <vo...@gridgain.com>
Committed: Wed Sep 7 13:42:36 2016 +0300
----------------------------------------------------------------------
.../ignite/spi/discovery/tcp/ClientImpl.java | 14 ++++----
.../ignite/spi/discovery/tcp/ServerImpl.java | 38 ++++++++++----------
.../spi/discovery/tcp/TcpDiscoveryImpl.java | 38 --------------------
.../spi/discovery/tcp/TcpDiscoverySpi.java | 17 ++++++---
.../TcpDiscoveryCustomEventMessage.java | 17 ++-------
.../spi/discovery/tcp/TcpDiscoverySelfTest.java | 2 +-
6 files changed, 43 insertions(+), 83 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/51d5f03b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
index 78a1911..a359a9c 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
@@ -430,7 +430,8 @@ class ClientImpl extends TcpDiscoveryImpl {
throw new IgniteClientDisconnectedException(null, "Failed to send custom message: client is disconnected.");
try {
- sockWriter.sendMessage(new TcpDiscoveryCustomEventMessage(getLocalNodeId(), evt, marshal(evt)));
+ sockWriter.sendMessage(new TcpDiscoveryCustomEventMessage(getLocalNodeId(), evt,
+ spi.marshaller().marshal(evt)));
}
catch (IgniteCheckedException e) {
throw new IgniteSpiException("Failed to marshal custom event: " + evt, e);
@@ -685,7 +686,7 @@ class ClientImpl extends TcpDiscoveryImpl {
attrs.put(
IgniteNodeAttributes.ATTR_SECURITY_CREDENTIALS,
- marshal(attrs.get(IgniteNodeAttributes.ATTR_SECURITY_CREDENTIALS))
+ spi.marshaller().marshal(attrs.get(IgniteNodeAttributes.ATTR_SECURITY_CREDENTIALS))
);
node.setAttributes(attrs);
@@ -887,7 +888,7 @@ class ClientImpl extends TcpDiscoveryImpl {
TcpDiscoveryAbstractMessage msg;
try {
- msg = unmarshal(in);
+ msg = spi.marshaller().unmarshal(in, U.resolveClassLoader(spi.ignite().configuration()));
}
catch (IgniteCheckedException e) {
if (log.isDebugEnabled())
@@ -1212,7 +1213,8 @@ class ClientImpl extends TcpDiscoveryImpl {
List<TcpDiscoveryAbstractMessage> msgs = null;
while (!isInterrupted()) {
- TcpDiscoveryAbstractMessage msg = unmarshal(in);
+ TcpDiscoveryAbstractMessage msg =
+ spi.marshaller().unmarshal(in, U.resolveClassLoader(spi.ignite().configuration()));
if (msg instanceof TcpDiscoveryClientReconnectMessage) {
TcpDiscoveryClientReconnectMessage res = (TcpDiscoveryClientReconnectMessage)msg;
@@ -1965,8 +1967,8 @@ class ClientImpl extends TcpDiscoveryImpl {
if (node != null && node.visible()) {
try {
- DiscoverySpiCustomMessage msgObj = msg.message(spi.marsh,
- U.resolveClassLoader(spi.ignite().configuration()), spi.ignite().name());
+ DiscoverySpiCustomMessage msgObj = msg.message(spi.marshaller(),
+ U.resolveClassLoader(spi.ignite().configuration()));
notifyDiscovery(EVT_DISCOVERY_CUSTOM_EVT, topVer, node, allVisibleNodes(), msgObj);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/51d5f03b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
index 2d58842..35cd5d1 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
@@ -91,7 +91,6 @@ import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.lang.IgniteInClosure;
import org.apache.ignite.lang.IgniteProductVersion;
import org.apache.ignite.lang.IgniteUuid;
-import org.apache.ignite.marshaller.MarshallerUtils;
import org.apache.ignite.plugin.security.SecurityCredentials;
import org.apache.ignite.plugin.security.SecurityPermissionSet;
import org.apache.ignite.spi.IgniteNodeValidationResult;
@@ -742,7 +741,8 @@ class ServerImpl extends TcpDiscoveryImpl {
/** {@inheritDoc} */
@Override public void sendCustomEvent(DiscoverySpiCustomMessage evt) {
try {
- msgWorker.addMessage(new TcpDiscoveryCustomEventMessage(getLocalNodeId(), evt, marshal(evt)));
+ msgWorker.addMessage(new TcpDiscoveryCustomEventMessage(getLocalNodeId(), evt,
+ spi.marshaller().marshal(evt)));
}
catch (IgniteCheckedException e) {
throw new IgniteSpiException("Failed to marshal custom event: " + evt, e);
@@ -825,7 +825,7 @@ class ServerImpl extends TcpDiscoveryImpl {
Map<String, Object> attrs = new HashMap<>(locNode.attributes());
- attrs.put(IgniteNodeAttributes.ATTR_SECURITY_SUBJECT, marshal(subj));
+ attrs.put(IgniteNodeAttributes.ATTR_SECURITY_SUBJECT, spi.marshaller().marshal(subj));
attrs.remove(IgniteNodeAttributes.ATTR_SECURITY_CREDENTIALS);
locNode.setAttributes(attrs);
@@ -1242,7 +1242,7 @@ class ServerImpl extends TcpDiscoveryImpl {
attrs.put(
IgniteNodeAttributes.ATTR_SECURITY_CREDENTIALS,
- marshal(attrs.get(IgniteNodeAttributes.ATTR_SECURITY_CREDENTIALS))
+ spi.marshaller().marshal(attrs.get(IgniteNodeAttributes.ATTR_SECURITY_CREDENTIALS))
);
node.setAttributes(attrs);
@@ -1266,7 +1266,7 @@ class ServerImpl extends TcpDiscoveryImpl {
if (credBytes == null)
return null;
- return MarshallerUtils.unmarshal(spi.ignite().name(), spi.marsh, credBytes, null);
+ return spi.marshaller().unmarshal(credBytes, null);
}
catch (IgniteCheckedException e) {
throw new IgniteSpiException("Failed to unmarshal node security credentials: " + node.id(), e);
@@ -2360,7 +2360,7 @@ class ServerImpl extends TcpDiscoveryImpl {
for (ClientMessageWorker clientMsgWorker : clientMsgWorkers.values()) {
if (msgBytes == null) {
try {
- msgBytes = marshal(msg);
+ msgBytes = spi.marshaller().marshal(msg);
}
catch (IgniteCheckedException e) {
U.error(log, "Failed to marshal message: " + msg, e);
@@ -2379,7 +2379,8 @@ class ServerImpl extends TcpDiscoveryImpl {
if (clientMsgWorker.clientNodeId.equals(node.id())) {
try {
- msg0 = unmarshal(msgBytes);
+ msg0 = spi.marshaller().unmarshal(msgBytes,
+ U.resolveClassLoader(spi.ignite().configuration()));
prepareNodeAddedMessage(msg0, clientMsgWorker.clientNodeId, null, null, null);
@@ -3137,7 +3138,7 @@ class ServerImpl extends TcpDiscoveryImpl {
// Stick in authentication subject to node (use security-safe attributes for copy).
Map<String, Object> attrs = new HashMap<>(node.getAttributes());
- attrs.put(IgniteNodeAttributes.ATTR_SECURITY_SUBJECT, marshal(subj));
+ attrs.put(IgniteNodeAttributes.ATTR_SECURITY_SUBJECT, spi.marshaller().marshal(subj));
node.setAttributes(attrs);
}
@@ -3788,8 +3789,9 @@ class ServerImpl extends TcpDiscoveryImpl {
else {
SecurityContext subj = spi.nodeAuth.authenticateNode(node, cred);
- SecurityContext coordSubj =
- unmarshal(node.<byte[]>attribute(IgniteNodeAttributes.ATTR_SECURITY_SUBJECT));
+ SecurityContext coordSubj = spi.marshaller().unmarshal(
+ node.<byte[]>attribute(IgniteNodeAttributes.ATTR_SECURITY_SUBJECT),
+ U.resolveClassLoader(spi.ignite().configuration()));
if (!permissionsEqual(coordSubj.subject().permissions(), subj.subject().permissions())) {
// Node has not pass authentication.
@@ -4840,8 +4842,7 @@ class ServerImpl extends TcpDiscoveryImpl {
DiscoverySpiCustomMessage msgObj = null;
try {
- msgObj = msg.message(spi.marsh, U.resolveClassLoader(spi.ignite().configuration()),
- spi.ignite().name());
+ msgObj = msg.message(spi.marshaller(), U.resolveClassLoader(spi.ignite().configuration()));
}
catch (Throwable e) {
U.error(log, "Failed to unmarshal discovery custom message.", e);
@@ -4853,7 +4854,8 @@ class ServerImpl extends TcpDiscoveryImpl {
if (nextMsg != null) {
try {
TcpDiscoveryCustomEventMessage ackMsg =
- new TcpDiscoveryCustomEventMessage(getLocalNodeId(), nextMsg, marshal(nextMsg));
+ new TcpDiscoveryCustomEventMessage(getLocalNodeId(), nextMsg,
+ spi.marshaller().marshal(nextMsg));
ackMsg.topologyVersion(msg.topologyVersion());
@@ -4986,8 +4988,7 @@ class ServerImpl extends TcpDiscoveryImpl {
try {
final IgniteConfiguration cfg = spi.ignite().configuration();
- DiscoverySpiCustomMessage msgObj = msg.message(spi.marsh,
- U.resolveClassLoader(cfg), cfg.getGridName());
+ DiscoverySpiCustomMessage msgObj = msg.message(spi.marshaller(), U.resolveClassLoader(cfg));
lsnr.onDiscovery(DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT,
msg.topologyVersion(),
@@ -4997,7 +4998,7 @@ class ServerImpl extends TcpDiscoveryImpl {
msgObj);
if (msgObj.isMutable())
- msg.message(msgObj, marshal(msgObj));
+ msg.message(msgObj, spi.marshaller().marshal(msgObj));
}
catch (Throwable e) {
U.error(log, "Failed to unmarshal discovery custom message.", e);
@@ -5433,7 +5434,8 @@ class ServerImpl extends TcpDiscoveryImpl {
while (!isInterrupted()) {
try {
- TcpDiscoveryAbstractMessage msg = unmarshal(in);
+ TcpDiscoveryAbstractMessage msg =
+ spi.marshaller().unmarshal(in, U.resolveClassLoader(spi.ignite().configuration()));
msg.senderNodeId(nodeId);
@@ -5923,7 +5925,7 @@ class ServerImpl extends TcpDiscoveryImpl {
byte[] msgBytes = msgT.get2();
if (msgBytes == null)
- msgBytes = marshal(msg);
+ msgBytes = spi.marshaller().marshal(msg);
if (msg instanceof TcpDiscoveryClientAckResponse) {
if (clientVer == null) {
http://git-wip-us.apache.org/repos/asf/ignite/blob/51d5f03b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java
index 8abcb18..341e536 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java
@@ -17,7 +17,6 @@
package org.apache.ignite.spi.discovery.tcp;
-import java.io.InputStream;
import java.net.InetSocketAddress;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
@@ -28,14 +27,12 @@ import java.util.List;
import java.util.UUID;
import java.util.concurrent.ConcurrentLinkedDeque;
-import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.util.typedef.internal.LT;
import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.marshaller.MarshallerUtils;
import org.apache.ignite.spi.IgniteSpiContext;
import org.apache.ignite.spi.IgniteSpiException;
import org.apache.ignite.spi.IgniteSpiThread;
@@ -330,39 +327,4 @@ abstract class TcpDiscoveryImpl {
return res;
}
-
- /**
- * Marshal object.
- *
- * @param obj Object.
- * @return Result.
- * @throws IgniteCheckedException If failed.
- */
- protected byte[] marshal(Object obj) throws IgniteCheckedException {
- return MarshallerUtils.withNodeName(spi.marsh, spi.ignite().name()).marshal(obj);
- }
-
- /**
- * Unmarshal object.
- *
- * @param data Data.
- * @return Result.
- * @throws IgniteCheckedException If failed.
- */
- protected <T> T unmarshal(byte[] data) throws IgniteCheckedException {
- return MarshallerUtils.withNodeName(spi.marsh, spi.ignite().name())
- .unmarshal(data, U.resolveClassLoader(spi.ignite().configuration()));
- }
-
- /**
- * Unmarshal object.
- *
- * @param in Input stream.
- * @return Result.
- * @throws IgniteCheckedException If failed.
- */
- protected <T> T unmarshal(InputStream in) throws IgniteCheckedException {
- return MarshallerUtils.withNodeName(spi.marsh, spi.ignite().name())
- .unmarshal(in, U.resolveClassLoader(spi.ignite().configuration()));
- }
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/51d5f03b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
index 1de0504..1b37412 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
@@ -343,7 +343,7 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T
protected volatile long gridStartTime;
/** Marshaller. */
- protected final Marshaller marsh = new JdkMarshaller();
+ private final Marshaller marsh = new JdkMarshaller();
/** Statistics. */
protected final TcpDiscoveryStatistics stats = new TcpDiscoveryStatistics();
@@ -1378,7 +1378,7 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T
IgniteCheckedException err = null;
try {
- MarshallerUtils.withNodeName(marsh, ignite.name()).marshal(msg, out);
+ MarshallerUtils.withNodeName(marshaller(), ignite.name()).marshal(msg, out);
}
catch (IgniteCheckedException e) {
err = e;
@@ -1462,7 +1462,7 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T
try {
sock.setSoTimeout((int)timeout);
- T res = MarshallerUtils.withNodeName(marsh, ignite.name()).unmarshal(in == null ?
+ T res = MarshallerUtils.withNodeName(marshaller(), ignite.name()).unmarshal(in == null ?
sock.getInputStream() : in, U.resolveClassLoader(ignite.configuration()));
return res;
@@ -1680,7 +1680,7 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T
for (Map.Entry<Integer, Serializable> entry : data.entrySet()) {
try {
- byte[] bytes = marsh.marshal(entry.getValue());
+ byte[] bytes = marshaller().marshal(entry.getValue());
data0.put(entry.getKey(), bytes);
}
@@ -1711,7 +1711,7 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T
for (Map.Entry<Integer, byte[]> entry : data.entrySet()) {
try {
Serializable compData =
- MarshallerUtils.withNodeName(marsh, ignite.name()).unmarshal(entry.getValue(), clsLdr);
+ MarshallerUtils.withNodeName(marshaller(), ignite.name()).unmarshal(entry.getValue(), clsLdr);
data0.put(entry.getKey(), compData);
}
@@ -1988,6 +1988,13 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T
impl.brakeConnection();
}
+ /**
+ * @return Marshaller.
+ */
+ protected Marshaller marshaller() {
+ return MarshallerUtils.withNodeName(marsh, gridName);
+ }
+
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(TcpDiscoverySpi.class, this);
http://git-wip-us.apache.org/repos/asf/ignite/blob/51d5f03b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryCustomEventMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryCustomEventMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryCustomEventMessage.java
index 897a4ed..f627104 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryCustomEventMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryCustomEventMessage.java
@@ -22,7 +22,6 @@ import java.util.UUID;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.marshaller.Marshaller;
-import org.apache.ignite.marshaller.MarshallerUtils;
import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
@@ -73,25 +72,13 @@ public class TcpDiscoveryCustomEventMessage extends TcpDiscoveryAbstractMessage
/**
* @param marsh Marshaller.
- * @param gridName Grid name.
- * @return Deserialized message.
- * @throws java.lang.Throwable if unmarshal failed.
- */
- @Nullable public DiscoverySpiCustomMessage message(@NotNull Marshaller marsh, String gridName) throws Throwable {
- return message(marsh, null, gridName);
- }
-
- /**
- * @param marsh Marshaller.
* @param ldr Class loader.
- * @param gridName Grid name.
* @return Deserialized message.
* @throws java.lang.Throwable if unmarshal failed.
*/
- @Nullable public DiscoverySpiCustomMessage message(@NotNull Marshaller marsh, ClassLoader ldr,
- final String gridName) throws Throwable {
+ @Nullable public DiscoverySpiCustomMessage message(@NotNull Marshaller marsh, ClassLoader ldr) throws Throwable {
if (msg == null) {
- msg = MarshallerUtils.unmarshal(gridName, marsh, msgBytes, ldr);
+ msg = marsh.unmarshal(msgBytes, ldr);
assert msg != null;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/51d5f03b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java
index 7014608..b688b1f 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java
@@ -1964,7 +1964,7 @@ public class TcpDiscoverySelfTest extends GridCommonAbstractTest {
if (msg instanceof TcpDiscoveryCustomEventMessage) {
try {
DiscoveryCustomMessage custMsg = GridTestUtils.getFieldValue(
- ((TcpDiscoveryCustomEventMessage)msg).message(marsh, null), "delegate");
+ ((TcpDiscoveryCustomEventMessage)msg).message(marshaller(), null), "delegate");
if (custMsg instanceof StartRoutineAckDiscoveryMessage) {
log.info("Skip message send and stop node: " + msg);