You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2016/11/28 12:56:09 UTC
[2/2] ignite git commit: ignite-4296-1
ignite-4296-1
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/660f2a38
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/660f2a38
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/660f2a38
Branch: refs/heads/ignite-4296-1
Commit: 660f2a38b4af5fa81117713c7ddd06cdb6b0781c
Parents: efe38d4
Author: sboikov <sb...@gridgain.com>
Authored: Mon Nov 28 15:55:34 2016 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Mon Nov 28 15:55:34 2016 +0300
----------------------------------------------------------------------
.../ignite/spi/discovery/tcp/ClientImpl.java | 45 +++++++++--
.../ignite/spi/discovery/tcp/ServerImpl.java | 84 +++++++++++++++++---
.../spi/discovery/tcp/TcpDiscoverySpi.java | 12 ++-
.../TcpDiscoveryCustomEventMessage.java | 6 +-
.../TcpDiscoveryJoinRequestMessage.java | 12 ++-
.../messages/TcpDiscoveryNodeAddedMessage.java | 10 +++
.../spi/discovery/tcp/TcpDiscoverySelfTest.java | 2 +-
7 files changed, 145 insertions(+), 26 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/660f2a38/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
index 7b8c029..40a5cf9 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
@@ -430,8 +430,12 @@ class ClientImpl extends TcpDiscoveryImpl {
throw new IgniteClientDisconnectedException(null, "Failed to send custom message: client is disconnected.");
try {
- sockWriter.sendMessage(new TcpDiscoveryCustomEventMessage(getLocalNodeId(), evt,
- U.marshal(spi.marshaller(), evt)));
+ byte[] bytes = U.marshal(spi.marshaller(), evt);
+
+ if (TcpDiscoverySpi.COMPRESS_MSGS)
+ bytes = U.zip(bytes);
+
+ sockWriter.sendMessage(new TcpDiscoveryCustomEventMessage(getLocalNodeId(), evt, bytes));
}
catch (IgniteCheckedException e) {
throw new IgniteSpiException("Failed to marshal custom event: " + evt, e);
@@ -624,7 +628,22 @@ class ClientImpl extends TcpDiscoveryImpl {
if (locNode.order() > 0)
node = locNode.clientReconnectNode();
- msg = new TcpDiscoveryJoinRequestMessage(node, spi.collectExchangeData(getLocalNodeId()));
+ Map<Integer, byte[]> discoData = spi.collectExchangeData(getLocalNodeId());
+ byte[] discoDataBytes = null;
+
+ if (TcpDiscoverySpi.COMPRESS_MSGS) {
+ try {
+ log.info("Compress join request data.");
+
+ discoDataBytes = U.zip(spi.marshaller().marshal(discoData));
+ discoData = null;
+ }
+ catch (IgniteCheckedException e) {
+ throw new IgniteSpiException(e);
+ }
+ }
+
+ msg = new TcpDiscoveryJoinRequestMessage(node, discoData, discoDataBytes);
}
else
msg = new TcpDiscoveryClientReconnectMessage(getLocalNodeId(), rmtNodeId, lastMsgId);
@@ -1664,9 +1683,18 @@ class ClientImpl extends TcpDiscoveryImpl {
Map<Integer, byte[]> data = msg.newNodeDiscoveryData();
+ try {
+ if (data == null && msg.newNodeDiscoveryDataBytes() != null)
+ data = U.unmarshalZip(spi.marshaller(), msg.newNodeDiscoveryDataBytes(), U.resolveClassLoader(spi.ignite().configuration()));
+ }
+ catch (IgniteCheckedException e) {
+ U.error(log, "Failed to unmarshall discovery data", e);
+ }
+
if (data != null)
spi.onExchange(newNodeId, newNodeId, data,
- U.resolveClassLoader(spi.ignite().configuration()));
+ U.resolveClassLoader(spi.ignite().configuration()),
+ false);
}
}
else {
@@ -1689,8 +1717,11 @@ class ClientImpl extends TcpDiscoveryImpl {
if (dataMap != null) {
for (Map.Entry<UUID, Map<Integer, byte[]>> entry : dataMap.entrySet())
- spi.onExchange(getLocalNodeId(), entry.getKey(), entry.getValue(),
- U.resolveClassLoader(spi.ignite().configuration()));
+ spi.onExchange(getLocalNodeId(),
+ entry.getKey(),
+ entry.getValue(),
+ U.resolveClassLoader(spi.ignite().configuration()),
+ TcpDiscoverySpi.COMPRESS_MSGS);
}
locNode.setAttributes(msg.clientNodeAttributes());
@@ -1994,7 +2025,7 @@ class ClientImpl extends TcpDiscoveryImpl {
if (node != null && node.visible()) {
try {
DiscoverySpiCustomMessage msgObj = msg.message(spi.marshaller(),
- U.resolveClassLoader(spi.ignite().configuration()));
+ U.resolveClassLoader(spi.ignite().configuration()), TcpDiscoverySpi.COMPRESS_MSGS);
notifyDiscovery(EVT_DISCOVERY_CUSTOM_EVT, topVer, node, allVisibleNodes(), msgObj);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/660f2a38/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
index 8407e4f..b1643ca 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
@@ -760,8 +760,13 @@ class ServerImpl extends TcpDiscoveryImpl {
/** {@inheritDoc} */
@Override public void sendCustomEvent(DiscoverySpiCustomMessage evt) {
try {
+ byte[] bytes = U.marshal(spi.marshaller(), evt);
+
+ if (TcpDiscoverySpi.COMPRESS_MSGS)
+ bytes = U.zip(bytes);
+
msgWorker.addMessage(new TcpDiscoveryCustomEventMessage(getLocalNodeId(), evt,
- U.marshal(spi.marshaller(), evt)));
+ bytes));
}
catch (IgniteCheckedException e) {
throw new IgniteSpiException("Failed to marshal custom event: " + evt, e);
@@ -970,8 +975,24 @@ class ServerImpl extends TcpDiscoveryImpl {
*/
@SuppressWarnings({"BusyWait"})
private boolean sendJoinRequestMessage() throws IgniteSpiException {
+ Map<Integer, byte[]> discoData = spi.collectExchangeData(getLocalNodeId());
+ byte[] discoDataBytes = null;
+
+ if (TcpDiscoverySpi.COMPRESS_MSGS) {
+ try {
+ log.info("Compress join request data.");
+
+ discoDataBytes = U.zip(spi.marshaller().marshal(discoData));
+ discoData = null;
+ }
+ catch (IgniteCheckedException e) {
+ throw new IgniteSpiException(e);
+ }
+ }
+
TcpDiscoveryAbstractMessage joinReq = new TcpDiscoveryJoinRequestMessage(locNode,
- spi.collectExchangeData(getLocalNodeId()));
+ discoData,
+ discoDataBytes);
// Time when it has been detected, that addresses from IP finder do not respond.
long noResStart = 0;
@@ -1543,6 +1564,8 @@ class ServerImpl extends TcpDiscoveryImpl {
nodeAddedMsg.topologyHistory(hist);
}
+ else
+ clearNodeAddedMessage(nodeAddedMsg);
}
}
@@ -3762,7 +3785,10 @@ class ServerImpl extends TcpDiscoveryImpl {
log.info("Created TcpDiscoveryNodeAddedMessage [node=" + node.id() + ']');
TcpDiscoveryNodeAddedMessage nodeAddedMsg = new TcpDiscoveryNodeAddedMessage(locNodeId,
- node, msg.discoveryData(), spi.gridStartTime);
+ node,
+ msg.discoveryData(),
+ msg.discoveryDataBytes(),
+ spi.gridStartTime);
nodeAddedMsg.client(msg.client());
@@ -3798,8 +3824,9 @@ class ServerImpl extends TcpDiscoveryImpl {
IgniteSpiException sndErr = null;
Integer res = null;
- TcpDiscoveryJoinRequestMessage msg0 =
- new TcpDiscoveryJoinRequestMessage(msg.node(), msg.discoveryData());
+ TcpDiscoveryJoinRequestMessage msg0 = new TcpDiscoveryJoinRequestMessage(msg.node(),
+ msg.discoveryData(),
+ msg.discoveryDataBytes());
try {
res = trySendMessageDirectly(crd, msg0);
@@ -4190,10 +4217,30 @@ class ServerImpl extends TcpDiscoveryImpl {
Map<Integer, byte[]> data = msg.newNodeDiscoveryData();
+ try {
+ if (data == null && msg.newNodeDiscoveryDataBytes() != null)
+ data = U.unmarshalZip(spi.marshaller(), msg.newNodeDiscoveryDataBytes(), U.resolveClassLoader(spi.ignite().configuration()));
+ }
+ catch (IgniteCheckedException e) {
+ U.error(log, "Failed to unmarshall discovery data", e);
+ }
+
if (data != null)
- spi.onExchange(node.id(), node.id(), data, U.resolveClassLoader(spi.ignite().configuration()));
+ spi.onExchange(node.id(), node.id(), data, U.resolveClassLoader(spi.ignite().configuration()), false);
- msg.addDiscoveryData(locNodeId, spi.collectExchangeData(node.id()));
+ Map<Integer, byte[]> discoData = spi.collectExchangeData(node.id());
+
+ if (TcpDiscoverySpi.COMPRESS_MSGS) {
+ try {
+ for (Map.Entry<Integer, byte[]> e : discoData.entrySet())
+ e.setValue(U.zip(e.getValue()));
+ }
+ catch (IgniteCheckedException e) {
+ U.error(log, "Failed to compress message data", e);
+ }
+ }
+
+ msg.addDiscoveryData(locNodeId, discoData);
processMessageFailedNodes(msg);
}
@@ -4317,9 +4364,15 @@ class ServerImpl extends TcpDiscoveryImpl {
// Notify outside of synchronized block.
if (dataMap != null) {
- for (Map.Entry<UUID, Map<Integer, byte[]>> entry : dataMap.entrySet())
- spi.onExchange(node.id(), entry.getKey(), entry.getValue(),
- U.resolveClassLoader(spi.ignite().configuration()));
+ ClassLoader ldr = U.resolveClassLoader(spi.ignite().configuration());
+
+ for (Map.Entry<UUID, Map<Integer, byte[]>> entry : dataMap.entrySet()) {
+ spi.onExchange(node.id(),
+ entry.getKey(),
+ entry.getValue(),
+ ldr,
+ TcpDiscoverySpi.COMPRESS_MSGS);
+ }
}
processMessageFailedNodes(msg);
@@ -5258,7 +5311,7 @@ class ServerImpl extends TcpDiscoveryImpl {
DiscoverySpiCustomMessage msgObj = null;
try {
- msgObj = msg.message(spi.marshaller(), U.resolveClassLoader(spi.ignite().configuration()));
+ msgObj = msg.message(spi.marshaller(), U.resolveClassLoader(spi.ignite().configuration()), TcpDiscoverySpi.COMPRESS_MSGS);
}
catch (Throwable e) {
U.error(log, "Failed to unmarshal discovery custom message.", e);
@@ -5269,8 +5322,13 @@ class ServerImpl extends TcpDiscoveryImpl {
if (nextMsg != null) {
try {
+ byte[] bytes = U.marshal(spi.marshaller(), nextMsg);
+
+ if (TcpDiscoverySpi.COMPRESS_MSGS)
+ bytes = U.zip(bytes);
+
TcpDiscoveryCustomEventMessage ackMsg = new TcpDiscoveryCustomEventMessage(
- getLocalNodeId(), nextMsg, U.marshal(spi.marshaller(), nextMsg));
+ getLocalNodeId(), nextMsg, bytes);
ackMsg.topologyVersion(msg.topologyVersion());
@@ -5420,7 +5478,7 @@ class ServerImpl extends TcpDiscoveryImpl {
if (node != null) {
try {
DiscoverySpiCustomMessage msgObj = msg.message(spi.marshaller(),
- U.resolveClassLoader(spi.ignite().configuration()));
+ U.resolveClassLoader(spi.ignite().configuration()), TcpDiscoverySpi.COMPRESS_MSGS);
lsnr.onDiscovery(DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT,
msg.topologyVersion(),
http://git-wip-us.apache.org/repos/asf/ignite/blob/660f2a38/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
index a8704e7..e5c52d3 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
@@ -99,6 +99,8 @@ import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryDuplicateIdMessa
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryEnsureDelivery;
import org.jetbrains.annotations.Nullable;
+import static org.apache.ignite.IgniteSystemProperties.getBoolean;
+
/**
* Discovery SPI implementation that uses TCP/IP for node discovery.
* <p>
@@ -218,6 +220,9 @@ import org.jetbrains.annotations.Nullable;
@DiscoverySpiOrderSupport(true)
@DiscoverySpiHistorySupport(true)
public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, TcpDiscoverySpiMBean {
+ /** */
+ public static final boolean COMPRESS_MSGS = getBoolean("COMPRESS_DISCO_MSGS", false);
+
/** Failure detection timeout feature major version. */
final static byte FAILURE_DETECTION_MAJOR_VER = 1;
@@ -1704,7 +1709,8 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T
protected void onExchange(UUID joiningNodeID,
UUID nodeId,
Map<Integer, byte[]> data,
- ClassLoader clsLdr)
+ ClassLoader clsLdr,
+ boolean zip)
{
if (locNode.isDaemon())
return;
@@ -1713,7 +1719,9 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T
for (Map.Entry<Integer, byte[]> entry : data.entrySet()) {
try {
- Serializable compData = U.unmarshal(marshaller(), entry.getValue(), clsLdr);
+ Serializable compData = zip ?
+ (Serializable)U.unmarshalZip(marshaller(), entry.getValue(), clsLdr) :
+ (Serializable)U.unmarshal(marshaller(), entry.getValue(), clsLdr);
data0.put(entry.getKey(), compData);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/660f2a38/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryCustomEventMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryCustomEventMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryCustomEventMessage.java
index 219c0ad..a2ebdfc 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryCustomEventMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryCustomEventMessage.java
@@ -76,9 +76,11 @@ public class TcpDiscoveryCustomEventMessage extends TcpDiscoveryAbstractMessage
* @return Deserialized message,
* @throws java.lang.Throwable if unmarshal failed.
*/
- @Nullable public DiscoverySpiCustomMessage message(@NotNull Marshaller marsh, ClassLoader ldr) throws Throwable {
+ @Nullable public DiscoverySpiCustomMessage message(@NotNull Marshaller marsh, ClassLoader ldr, boolean zip) throws Throwable {
if (msg == null) {
- msg = U.unmarshal(marsh, msgBytes, ldr);
+ msg = zip ?
+ (DiscoverySpiCustomMessage)U.unmarshalZip(marsh, msgBytes, ldr) :
+ (DiscoverySpiCustomMessage)U.unmarshal(marsh, msgBytes, ldr);
assert msg != null;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/660f2a38/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryJoinRequestMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryJoinRequestMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryJoinRequestMessage.java
index 4422919..a41e064 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryJoinRequestMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryJoinRequestMessage.java
@@ -41,17 +41,23 @@ public class TcpDiscoveryJoinRequestMessage extends TcpDiscoveryAbstractMessage
@GridToStringExclude
private transient boolean directSndFailed;
+ /** */
+ private final byte[] discoDataBytes;
+
/**
* Constructor.
*
* @param node New node that wants to join.
* @param discoData Discovery data.
*/
- public TcpDiscoveryJoinRequestMessage(TcpDiscoveryNode node, Map<Integer, byte[]> discoData) {
+ public TcpDiscoveryJoinRequestMessage(TcpDiscoveryNode node,
+ Map<Integer, byte[]> discoData,
+ byte[] discoDataBytes) {
super(node.id());
this.node = node;
this.discoData = discoData;
+ this.discoDataBytes = discoDataBytes;
}
/**
@@ -63,6 +69,10 @@ public class TcpDiscoveryJoinRequestMessage extends TcpDiscoveryAbstractMessage
return node;
}
+ public byte[] discoveryDataBytes() {
+ return discoDataBytes;
+ }
+
/**
* @return Discovery data.
*/
http://git-wip-us.apache.org/repos/asf/ignite/blob/660f2a38/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddedMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddedMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddedMessage.java
index bd52c04..7163d58 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddedMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddedMessage.java
@@ -67,6 +67,9 @@ public class TcpDiscoveryNodeAddedMessage extends TcpDiscoveryAbstractMessage {
/** Discovery data from new node. */
private Map<Integer, byte[]> newNodeDiscoData;
+ /** */
+ private byte[] newNodeDiscoDataBytes;
+
/** Discovery data from old nodes. */
private Map<UUID, Map<Integer, byte[]>> oldNodesDiscoData;
@@ -84,6 +87,7 @@ public class TcpDiscoveryNodeAddedMessage extends TcpDiscoveryAbstractMessage {
public TcpDiscoveryNodeAddedMessage(UUID creatorNodeId,
TcpDiscoveryNode node,
Map<Integer, byte[]> newNodeDiscoData,
+ byte[] newNodeDiscoDataBytes,
long gridStartTime)
{
super(creatorNodeId);
@@ -93,6 +97,7 @@ public class TcpDiscoveryNodeAddedMessage extends TcpDiscoveryAbstractMessage {
this.node = node;
this.newNodeDiscoData = newNodeDiscoData;
+ this.newNodeDiscoDataBytes = newNodeDiscoDataBytes;
this.gridStartTime = gridStartTime;
oldNodesDiscoData = new LinkedHashMap<>();
@@ -112,6 +117,7 @@ public class TcpDiscoveryNodeAddedMessage extends TcpDiscoveryAbstractMessage {
this.clientTop = msg.clientTop;
this.topHist = msg.topHist;
this.newNodeDiscoData = msg.newNodeDiscoData;
+ this.newNodeDiscoDataBytes = msg.newNodeDiscoDataBytes;
this.oldNodesDiscoData = msg.oldNodesDiscoData;
this.gridStartTime = msg.gridStartTime;
}
@@ -228,6 +234,10 @@ public class TcpDiscoveryNodeAddedMessage extends TcpDiscoveryAbstractMessage {
return newNodeDiscoData;
}
+ public byte[] newNodeDiscoveryDataBytes() {
+ return newNodeDiscoDataBytes;
+ }
+
/**
* @return Discovery data from old nodes.
*/
http://git-wip-us.apache.org/repos/asf/ignite/blob/660f2a38/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java
index 043208c..38fbde7 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java
@@ -2189,7 +2189,7 @@ public class TcpDiscoverySelfTest extends GridCommonAbstractTest {
if (msg instanceof TcpDiscoveryCustomEventMessage) {
try {
DiscoveryCustomMessage custMsg = GridTestUtils.getFieldValue(
- ((TcpDiscoveryCustomEventMessage)msg).message(marshaller(), U.gridClassLoader()), "delegate");
+ ((TcpDiscoveryCustomEventMessage)msg).message(marshaller(), U.gridClassLoader(), false), "delegate");
if (custMsg instanceof StartRoutineAckDiscoveryMessage) {
log.info("Skip message send and stop node: " + msg);