You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/05/29 14:46:13 UTC
[06/50] incubator-ignite git commit: IGNITE-709 Fix
CacheListenerTest#testDeregistration()
IGNITE-709 Fix CacheListenerTest#testDeregistration()
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/8fbb590f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/8fbb590f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/8fbb590f
Branch: refs/heads/ignite-929
Commit: 8fbb590f2e659c785c6d89ea49e1e4bb3fdc666f
Parents: 0ae75d8
Author: sevdokimov <se...@jetbrains.com>
Authored: Sun May 24 20:23:01 2015 +0300
Committer: sevdokimov <se...@jetbrains.com>
Committed: Sun May 24 20:23:01 2015 +0300
----------------------------------------------------------------------
.../discovery/CustomMessageWrapper.java | 5 ++
.../discovery/DiscoveryCustomMessage.java | 5 ++
.../cache/DynamicCacheChangeBatch.java | 5 ++
.../continuous/AbstractContinuousMessage.java | 54 ++++++++++++++++++++
.../StartRoutineAckDiscoveryMessage.java | 20 ++------
.../StartRoutineDiscoveryMessage.java | 25 +++------
.../StopRoutineAckDiscoveryMessage.java | 19 +------
.../continuous/StopRoutineDiscoveryMessage.java | 19 +------
.../discovery/DiscoverySpiCustomMessage.java | 5 ++
.../discovery/tcp/TcpClientDiscoverySpi.java | 4 +-
.../spi/discovery/tcp/TcpDiscoverySpi.java | 14 ++---
.../TcpDiscoveryCustomEventMessage.java | 31 +++++++++--
12 files changed, 126 insertions(+), 80 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8fbb590f/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/CustomMessageWrapper.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/CustomMessageWrapper.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/CustomMessageWrapper.java
index 0afb6cf..23f8bda 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/CustomMessageWrapper.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/CustomMessageWrapper.java
@@ -44,6 +44,11 @@ class CustomMessageWrapper implements DiscoverySpiCustomMessage {
return res == null ? null : new CustomMessageWrapper(res);
}
+ /** {@inheritDoc} */
+ @Override public boolean isMutable() {
+ return delegate.isMutable();
+ }
+
/**
* @return Delegate.
*/
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8fbb590f/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryCustomMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryCustomMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryCustomMessage.java
index 13c0b9c..693bbef 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryCustomMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryCustomMessage.java
@@ -40,4 +40,9 @@ public interface DiscoveryCustomMessage extends Serializable {
* @return Ack message or {@code null} if ack is not required.
*/
@Nullable public DiscoveryCustomMessage ackMessage();
+
+ /**
+ * @return {@code true} if message can be modified during listener notification. Changes will be send to next nodes.
+ */
+ public boolean isMutable();
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8fbb590f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeBatch.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeBatch.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeBatch.java
index ca257a9..5fcd0e2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeBatch.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeBatch.java
@@ -83,4 +83,9 @@ public class DynamicCacheChangeBatch implements DiscoveryCustomMessage {
@Nullable @Override public DiscoveryCustomMessage ackMessage() {
return null;
}
+
+ /** {@inheritDoc} */
+ @Override public boolean isMutable() {
+ return false;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8fbb590f/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/AbstractContinuousMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/AbstractContinuousMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/AbstractContinuousMessage.java
new file mode 100644
index 0000000..f375777
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/AbstractContinuousMessage.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.continuous;
+
+import org.apache.ignite.internal.managers.discovery.*;
+
+import java.util.*;
+
+/**
+ *
+ */
+public abstract class AbstractContinuousMessage implements DiscoveryCustomMessage {
+ /** Routine ID. */
+ protected final UUID routineId;
+
+ /**
+ * @param id Id.
+ */
+ protected AbstractContinuousMessage(UUID id) {
+ routineId = id;
+ }
+
+ /**
+ * @return Routine ID.
+ */
+ public UUID routineId() {
+ return routineId;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean incrementMinorTopologyVersion() {
+ return false;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean isMutable() {
+ return false;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8fbb590f/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineAckDiscoveryMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineAckDiscoveryMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineAckDiscoveryMessage.java
index 66892b1..3e3e6fe 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineAckDiscoveryMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineAckDiscoveryMessage.java
@@ -26,13 +26,10 @@ import java.util.*;
/**
*
*/
-public class StartRoutineAckDiscoveryMessage implements DiscoveryCustomMessage {
+public class StartRoutineAckDiscoveryMessage extends AbstractContinuousMessage {
/** */
private static final long serialVersionUID = 0L;
- /** Routine ID. */
- private final UUID routineId;
-
/** */
private final Map<UUID, IgniteCheckedException> errs;
@@ -41,13 +38,9 @@ public class StartRoutineAckDiscoveryMessage implements DiscoveryCustomMessage {
* @param errs Errs.
*/
public StartRoutineAckDiscoveryMessage(UUID routineId, Map<UUID, IgniteCheckedException> errs) {
- this.routineId = routineId;
- this.errs = new HashMap<>(errs);
- }
+ super(routineId);
- /** {@inheritDoc} */
- @Override public boolean incrementMinorTopologyVersion() {
- return false;
+ this.errs = new HashMap<>(errs);
}
/** {@inheritDoc} */
@@ -56,13 +49,6 @@ public class StartRoutineAckDiscoveryMessage implements DiscoveryCustomMessage {
}
/**
- * @return Routine ID.
- */
- public UUID routineId() {
- return routineId;
- }
-
- /**
* @return Errs.
*/
public Map<UUID, IgniteCheckedException> errs() {
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8fbb590f/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineDiscoveryMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineDiscoveryMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineDiscoveryMessage.java
index 2199fd0..ec0d36b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineDiscoveryMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineDiscoveryMessage.java
@@ -26,13 +26,10 @@ import java.util.*;
/**
*
*/
-public class StartRoutineDiscoveryMessage implements DiscoveryCustomMessage {
+public class StartRoutineDiscoveryMessage extends AbstractContinuousMessage {
/** */
private static final long serialVersionUID = 0L;
- /** Routine ID. */
- private final UUID routineId;
-
/** */
private final StartRequestData startReqData;
@@ -44,13 +41,9 @@ public class StartRoutineDiscoveryMessage implements DiscoveryCustomMessage {
* @param startReqData Start request data.
*/
public StartRoutineDiscoveryMessage(UUID routineId, StartRequestData startReqData) {
- this.routineId = routineId;
- this.startReqData = startReqData;
- }
+ super(routineId);
- /** {@inheritDoc} */
- @Override public boolean incrementMinorTopologyVersion() {
- return false;
+ this.startReqData = startReqData;
}
/**
@@ -69,13 +62,6 @@ public class StartRoutineDiscoveryMessage implements DiscoveryCustomMessage {
}
/**
- * @return Routine ID.
- */
- public UUID routineId() {
- return routineId;
- }
-
- /**
* @return Errs.
*/
public Map<UUID, IgniteCheckedException> errs() {
@@ -83,6 +69,11 @@ public class StartRoutineDiscoveryMessage implements DiscoveryCustomMessage {
}
/** {@inheritDoc} */
+ @Override public boolean isMutable() {
+ return true;
+ }
+
+ /** {@inheritDoc} */
@Override public DiscoveryCustomMessage ackMessage() {
return new StartRoutineAckDiscoveryMessage(routineId, errs);
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8fbb590f/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StopRoutineAckDiscoveryMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StopRoutineAckDiscoveryMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StopRoutineAckDiscoveryMessage.java
index a640222..350f13c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StopRoutineAckDiscoveryMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StopRoutineAckDiscoveryMessage.java
@@ -25,34 +25,19 @@ import java.util.*;
/**
*
*/
-public class StopRoutineAckDiscoveryMessage implements DiscoveryCustomMessage {
+public class StopRoutineAckDiscoveryMessage extends AbstractContinuousMessage {
/** */
private static final long serialVersionUID = 0L;
- /** Routine ID. */
- private final UUID routineId;
-
/**
* @param routineId Routine id.
*/
public StopRoutineAckDiscoveryMessage(UUID routineId) {
- this.routineId = routineId;
- }
-
- /** {@inheritDoc} */
- @Override public boolean incrementMinorTopologyVersion() {
- return false;
+ super(routineId);
}
/** {@inheritDoc} */
@Nullable @Override public DiscoveryCustomMessage ackMessage() {
return null;
}
-
- /**
- * @return Routine ID.
- */
- public UUID routineId() {
- return routineId;
- }
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8fbb590f/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StopRoutineDiscoveryMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StopRoutineDiscoveryMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StopRoutineDiscoveryMessage.java
index e8a43a3..5b0dc5f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StopRoutineDiscoveryMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StopRoutineDiscoveryMessage.java
@@ -25,30 +25,15 @@ import java.util.*;
/**
*
*/
-public class StopRoutineDiscoveryMessage implements DiscoveryCustomMessage {
+public class StopRoutineDiscoveryMessage extends AbstractContinuousMessage {
/** */
private static final long serialVersionUID = 0L;
- /** Routine ID. */
- private final UUID routineId;
-
/**
* @param routineId Routine id.
*/
public StopRoutineDiscoveryMessage(UUID routineId) {
- this.routineId = routineId;
- }
-
- /** {@inheritDoc} */
- @Override public boolean incrementMinorTopologyVersion() {
- return false;
- }
-
- /**
- * @return Routine ID.
- */
- public UUID routineId() {
- return routineId;
+ super(routineId);
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8fbb590f/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiCustomMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiCustomMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiCustomMessage.java
index 72ba9db..15e943b 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiCustomMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiCustomMessage.java
@@ -32,4 +32,9 @@ public interface DiscoverySpiCustomMessage extends Serializable {
* Called when message passed the ring.
*/
@Nullable public DiscoverySpiCustomMessage ackMessage();
+
+ /**
+ * @return {@code true} if message can be modified during listener notification. Changes will be send to next nodes.
+ */
+ public boolean isMutable();
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8fbb590f/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpi.java
index 46e9635..22bb49b 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpi.java
@@ -461,7 +461,7 @@ public class TcpClientDiscoverySpi extends TcpDiscoverySpiAdapter implements Tcp
throw new IgniteException("Failed to send custom message: client is disconnected");
try {
- sockWriter.sendMessage(new TcpDiscoveryCustomEventMessage(getLocalNodeId(), marsh.marshal(evt)));
+ sockWriter.sendMessage(new TcpDiscoveryCustomEventMessage(getLocalNodeId(), evt, marsh.marshal(evt)));
}
catch (IgniteCheckedException e) {
throw new IgniteSpiException("Failed to marshal custom event: " + evt, e);
@@ -1481,7 +1481,7 @@ public class TcpClientDiscoverySpi extends TcpDiscoverySpiAdapter implements Tcp
if (node != null && node.visible()) {
try {
- DiscoverySpiCustomMessage msgObj = marsh.unmarshal(msg.messageBytes(), U.gridClassLoader());
+ DiscoverySpiCustomMessage msgObj = msg.message(marsh);
notifyDiscovery(EVT_DISCOVERY_CUSTOM_EVT, topVer, node, allVisibleNodes(), msgObj);
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8fbb590f/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
index 0164e5c..34e1ca8 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
@@ -1266,7 +1266,7 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov
/** {@inheritDoc} */
@Override public void sendCustomEvent(DiscoverySpiCustomMessage evt) {
try {
- msgWorker.addMessage(new TcpDiscoveryCustomEventMessage(getLocalNodeId(), marsh.marshal(evt)));
+ msgWorker.addMessage(new TcpDiscoveryCustomEventMessage(getLocalNodeId(), evt, marsh.marshal(evt)));
}
catch (IgniteCheckedException e) {
throw new IgniteSpiException("Failed to marshal custom event: " + evt, e);
@@ -4536,7 +4536,7 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov
DiscoverySpiCustomMessage msgObj = null;
try {
- msgObj = marsh.unmarshal(msg.messageBytes(), U.gridClassLoader());
+ msgObj = msg.message(marsh);
}
catch (Throwable e) {
U.error(log, "Failed to unmarshal discovery custom message.", e);
@@ -4547,7 +4547,8 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov
if (nextMsg != null) {
try {
- addMessage(new TcpDiscoveryCustomEventMessage(getLocalNodeId(), marsh.marshal(nextMsg)));
+ addMessage(new TcpDiscoveryCustomEventMessage(getLocalNodeId(), nextMsg,
+ marsh.marshal(nextMsg)));
}
catch (IgniteCheckedException e) {
U.error(log, "Failed to marshal discovery custom message.", e);
@@ -4584,13 +4585,11 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov
Collection<ClusterNode> snapshot = hist.get(msg.topologyVersion());
if (lsnr != null && (spiState == CONNECTED || spiState == DISCONNECTING)) {
- assert msg.messageBytes() != null;
-
TcpDiscoveryNode node = ring.node(msg.creatorNodeId());
if (node != null) {
try {
- DiscoverySpiCustomMessage msgObj = marsh.unmarshal(msg.messageBytes(), U.gridClassLoader());
+ DiscoverySpiCustomMessage msgObj = msg.message(marsh);
lsnr.onDiscovery(DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT,
msg.topologyVersion(),
@@ -4599,7 +4598,8 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov
hist,
msgObj);
- msg.messageBytes(marsh.marshal(msgObj));
+ if (msgObj.isMutable())
+ msg.message(msgObj, marsh.marshal(msgObj));
}
catch (Throwable e) {
U.error(log, "Failed to unmarshal discovery custom message.", e);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8fbb590f/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryCustomEventMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryCustomEventMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryCustomEventMessage.java
index 372aa18..0739c1d 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryCustomEventMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryCustomEventMessage.java
@@ -18,6 +18,9 @@
package org.apache.ignite.spi.discovery.tcp.messages;
import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.marshaller.*;
+import org.apache.ignite.spi.discovery.*;
+import org.jetbrains.annotations.*;
import java.util.*;
@@ -31,15 +34,21 @@ public class TcpDiscoveryCustomEventMessage extends TcpDiscoveryAbstractMessage
private static final long serialVersionUID = 0L;
/** */
+ private transient volatile DiscoverySpiCustomMessage msg;
+
+ /** */
private byte[] msgBytes;
/**
* @param creatorNodeId Creator node id.
+ * @param msg Message.
* @param msgBytes Serialized message.
*/
- public TcpDiscoveryCustomEventMessage(UUID creatorNodeId, byte[] msgBytes) {
+ public TcpDiscoveryCustomEventMessage(UUID creatorNodeId, @Nullable DiscoverySpiCustomMessage msg,
+ @NotNull byte[] msgBytes) {
super(creatorNodeId);
+ this.msg = msg;
this.msgBytes = msgBytes;
}
@@ -51,12 +60,28 @@ public class TcpDiscoveryCustomEventMessage extends TcpDiscoveryAbstractMessage
}
/**
- * @param msgBytes New message bytes.
+ * @param msg Message.
+ * @param msgBytes Serialized message.
*/
- public void messageBytes(byte[] msgBytes) {
+ public void message(@Nullable DiscoverySpiCustomMessage msg, @NotNull byte[] msgBytes) {
+ this.msg = msg;
this.msgBytes = msgBytes;
}
+ /**
+ * @return Deserialized message,
+ * @throws java.lang.Throwable if unmarshal failed.
+ */
+ @Nullable public DiscoverySpiCustomMessage message(@NotNull Marshaller marsh) throws Throwable {
+ if (msg == null) {
+ msg = marsh.unmarshal(msgBytes, U.gridClassLoader());
+
+ assert msg != null;
+ }
+
+ return msg;
+ }
+
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(TcpDiscoveryCustomEventMessage.class, this, "super", super.toString());