You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ag...@apache.org on 2018/04/13 09:33:21 UTC
[09/54] [abbrv] ignite git commit: IGNITE-7222 Added ZooKeeper
discovery SPI
http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/TcpCommunicationConnectionCheckFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/TcpCommunicationConnectionCheckFuture.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/TcpCommunicationConnectionCheckFuture.java
new file mode 100644
index 0000000..c42fa57
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/TcpCommunicationConnectionCheckFuture.java
@@ -0,0 +1,519 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.communication.tcp.internal;
+
+import java.net.InetSocketAddress;
+import java.nio.channels.SocketChannel;
+import java.util.BitSet;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.events.DiscoveryEvent;
+import org.apache.ignite.events.Event;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
+import org.apache.ignite.internal.util.GridLeanMap;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.internal.util.nio.GridNioServer;
+import org.apache.ignite.internal.util.nio.GridNioSession;
+import org.apache.ignite.internal.util.nio.GridNioSessionMetaKey;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteInClosure;
+import org.apache.ignite.lang.IgniteUuid;
+import org.apache.ignite.spi.IgniteSpiTimeoutObject;
+import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
+import org.jetbrains.annotations.Nullable;
+
+import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
+import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
+
+/**
+ * Tcp Communication Connection Check Future.
+ */
+public class TcpCommunicationConnectionCheckFuture extends GridFutureAdapter<BitSet> implements IgniteSpiTimeoutObject, GridLocalEventListener {
+ /** Session future. */
+ public static final int SES_FUT_META = GridNioSessionMetaKey.nextUniqueKey();
+
+ /** */
+ private static final AtomicIntegerFieldUpdater<SingleAddressConnectFuture> connFutDoneUpdater =
+ AtomicIntegerFieldUpdater.newUpdater(SingleAddressConnectFuture.class, "done");
+
+ /** */
+ private static final AtomicIntegerFieldUpdater<MultipleAddressesConnectFuture> connResCntUpdater =
+ AtomicIntegerFieldUpdater.newUpdater(MultipleAddressesConnectFuture.class, "resCnt");
+
+ /** */
+ private final AtomicInteger resCntr = new AtomicInteger();
+
+ /** */
+ private final List<ClusterNode> nodes;
+
+ /** */
+ private volatile ConnectFuture[] futs;
+
+ /** */
+ private final GridNioServer nioSrvr;
+
+ /** */
+ private final TcpCommunicationSpi spi;
+
+ /** */
+ private final IgniteUuid timeoutObjId = IgniteUuid.randomUuid();
+
+ /** */
+ private final BitSet resBitSet;
+
+ /** */
+ private long endTime;
+
+ /** */
+ private final IgniteLogger log;
+
+ /**
+ * @param spi SPI instance.
+ * @param log Logger.
+ * @param nioSrvr NIO server.
+ * @param nodes Nodes to check.
+ */
+ public TcpCommunicationConnectionCheckFuture(TcpCommunicationSpi spi,
+ IgniteLogger log,
+ GridNioServer nioSrvr,
+ List<ClusterNode> nodes)
+ {
+ this.spi = spi;
+ this.log = log;
+ this.nioSrvr = nioSrvr;
+ this.nodes = nodes;
+
+ resBitSet = new BitSet(nodes.size());
+ }
+
+ /**
+ * @param timeout Connect timeout.
+ */
+ public void init(long timeout) {
+ ConnectFuture[] futs = new ConnectFuture[nodes.size()];
+
+ UUID locId = spi.getSpiContext().localNode().id();
+
+ for (int i = 0; i < nodes.size(); i++) {
+ ClusterNode node = nodes.get(i);
+
+ if (!node.id().equals(locId)) {
+ if (spi.getSpiContext().node(node.id()) == null) {
+ receivedConnectionStatus(i, false);
+
+ continue;
+ }
+
+ Collection<InetSocketAddress> addrs;
+
+ try {
+ addrs = spi.nodeAddresses(node, false);
+ }
+ catch (Exception e) {
+ U.error(log, "Failed to get node addresses: " + node, e);
+
+ receivedConnectionStatus(i, false);
+
+ continue;
+ }
+
+ if (addrs.size() == 1) {
+ SingleAddressConnectFuture fut = new SingleAddressConnectFuture(i);
+
+ fut.init(addrs.iterator().next(), node.id());
+
+ futs[i] = fut;
+ }
+ else {
+ MultipleAddressesConnectFuture fut = new MultipleAddressesConnectFuture(i);
+
+ fut.init(addrs, node.id());
+
+ futs[i] = fut;
+ }
+ }
+ else
+ receivedConnectionStatus(i, true);
+ }
+
+ this.futs = futs;
+
+ spi.getSpiContext().addLocalEventListener(this, EVT_NODE_LEFT, EVT_NODE_FAILED);
+
+ if (!isDone()) {
+ endTime = System.currentTimeMillis() + timeout;
+
+ spi.getSpiContext().addTimeoutObject(this);
+ }
+ }
+
+ /**
+ * @param idx Node index.
+ * @param res Success flag.
+ */
+ private void receivedConnectionStatus(int idx, boolean res) {
+ assert resCntr.get() < nodes.size();
+
+ synchronized (resBitSet) {
+ resBitSet.set(idx, res);
+ }
+
+ if (resCntr.incrementAndGet() == nodes.size())
+ onDone(resBitSet);
+ }
+
+ /**
+ * @param nodeIdx Node index.
+ * @return Node ID.
+ */
+ private UUID nodeId(int nodeIdx) {
+ return nodes.get(nodeIdx).id();
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteUuid id() {
+ return timeoutObjId;
+ }
+
+ /** {@inheritDoc} */
+ @Override public long endTime() {
+ return endTime;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onEvent(Event evt) {
+ if (isDone())
+ return;
+
+ assert evt instanceof DiscoveryEvent : evt;
+ assert evt.type() == EVT_NODE_LEFT || evt.type() == EVT_NODE_FAILED ;
+
+ UUID nodeId = ((DiscoveryEvent)evt).eventNode().id();
+
+ for (int i = 0; i < nodes.size(); i++) {
+ if (nodes.get(i).id().equals(nodeId)) {
+ ConnectFuture fut = futs[i];
+
+ if (fut != null)
+ fut.onNodeFailed();
+
+ return;
+ }
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onTimeout() {
+ if (isDone())
+ return;
+
+ ConnectFuture[] futs = this.futs;
+
+ for (int i = 0; i < futs.length; i++) {
+ ConnectFuture fut = futs[i];
+
+ if (fut != null)
+ fut.onTimeout();
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean onDone(@Nullable BitSet res, @Nullable Throwable err) {
+ if (super.onDone(res, err)) {
+ spi.getSpiContext().removeTimeoutObject(this);
+
+ spi.getSpiContext().removeLocalEventListener(this);
+
+ return true;
+ }
+
+ return false;
+ }
+
+ /**
+ *
+ */
+ private interface ConnectFuture {
+ /**
+ *
+ */
+ void onTimeout();
+
+ /**
+ *
+ */
+ void onNodeFailed();
+ }
+
+ /**
+ *
+ */
+ private class SingleAddressConnectFuture implements TcpCommunicationNodeConnectionCheckFuture, ConnectFuture {
+ /** */
+ final int nodeIdx;
+
+ /** */
+ volatile int done;
+
+ /** */
+ Map<Integer, Object> sesMeta;
+
+ /** */
+ private SocketChannel ch;
+
+ /**
+ * @param nodeIdx Node index.
+ */
+ SingleAddressConnectFuture(int nodeIdx) {
+ this.nodeIdx = nodeIdx;
+ }
+
+ /**
+ * @param addr Node address.
+ * @param rmtNodeId Id of node to open connection check session with.
+ */
+ public void init(InetSocketAddress addr, UUID rmtNodeId) {
+ boolean connect;
+
+ try {
+ ch = SocketChannel.open();
+
+ ch.configureBlocking(false);
+
+ ch.socket().setTcpNoDelay(true);
+ ch.socket().setKeepAlive(false);
+
+ connect = ch.connect(addr);
+ }
+ catch (Exception e) {
+ finish(false);
+
+ return;
+ }
+
+ if (!connect) {
+ sesMeta = new GridLeanMap<>(3);
+
+ // Set dummy key to identify connection-check outgoing connection.
+ sesMeta.put(TcpCommunicationSpi.CONN_IDX_META, new ConnectionKey(rmtNodeId, -1, -1, true));
+ sesMeta.put(SES_FUT_META, this);
+
+ nioSrvr.createSession(ch, sesMeta, true, new IgniteInClosure<IgniteInternalFuture<GridNioSession>>() {
+ @Override public void apply(IgniteInternalFuture<GridNioSession> fut) {
+ if (fut.error() != null)
+ finish(false);
+ }
+ });
+ }
+ }
+
+ /**
+ *
+ */
+ @SuppressWarnings("unchecked")
+ void cancel() {
+ if (finish(false))
+ nioSrvr.cancelConnect(ch, sesMeta);
+ }
+
+ /** {@inheritDoc} */
+ public void onTimeout() {
+ cancel();
+ }
+
+ /** {@inheritDoc} */
+ public void onConnected(UUID rmtNodeId) {
+ finish(nodeId(nodeIdx).equals(rmtNodeId));
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onNodeFailed() {
+ cancel();
+ }
+
+ /**
+ * @param res Result.
+ * @return {@code True} if result was set by this call.
+ */
+ public boolean finish(boolean res) {
+ if (connFutDoneUpdater.compareAndSet(this, 0, 1)) {
+ onStatusReceived(res);
+
+ return true;
+ }
+
+ return false;
+ }
+
+ /**
+ * @param res Result.
+ */
+ void onStatusReceived(boolean res) {
+ receivedConnectionStatus(nodeIdx, res);
+ }
+ }
+
+ /**
+ *
+ */
+ private class MultipleAddressesConnectFuture implements ConnectFuture {
+ /** */
+ volatile int resCnt;
+
+ /** */
+ volatile SingleAddressConnectFuture[] futs;
+
+ /** */
+ final int nodeIdx;
+
+ /**
+ * @param nodeIdx Node index.
+ */
+ MultipleAddressesConnectFuture(int nodeIdx) {
+ this.nodeIdx = nodeIdx;
+
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onNodeFailed() {
+ SingleAddressConnectFuture[] futs = this.futs;
+
+ for (int i = 0; i < futs.length; i++) {
+ ConnectFuture fut = futs[i];
+
+ if (fut != null)
+ fut.onNodeFailed();
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onTimeout() {
+ SingleAddressConnectFuture[] futs = this.futs;
+
+ for (int i = 0; i < futs.length; i++) {
+ ConnectFuture fut = futs[i];
+
+ if (fut != null)
+ fut.onTimeout();
+ }
+ }
+
+ /**
+ * @param addrs Node addresses.
+ * @param rmtNodeId Id of node to open connection check session with.
+ */
+ void init(Collection<InetSocketAddress> addrs, UUID rmtNodeId) {
+ SingleAddressConnectFuture[] futs = new SingleAddressConnectFuture[addrs.size()];
+
+ for (int i = 0; i < addrs.size(); i++) {
+ SingleAddressConnectFuture fut = new SingleAddressConnectFuture(nodeIdx) {
+ @Override void onStatusReceived(boolean res) {
+ receivedAddressStatus(res);
+ }
+ };
+
+ futs[i] = fut;
+ }
+
+ this.futs = futs;
+
+ int idx = 0;
+
+ for (InetSocketAddress addr : addrs) {
+ futs[idx++].init(addr, rmtNodeId);
+
+ if (resCnt == Integer.MAX_VALUE)
+ return;
+ }
+
+ // Close race.
+ if (done())
+ cancelFutures();
+ }
+
+ /**
+ * @return {@code True}
+ */
+ private boolean done() {
+ int resCnt0 = resCnt;
+
+ return resCnt0 == Integer.MAX_VALUE || resCnt0 == futs.length;
+ }
+
+ /**
+ *
+ */
+ private void cancelFutures() {
+ SingleAddressConnectFuture[] futs = this.futs;
+
+ if (futs != null) {
+ for (int i = 0; i < futs.length; i++) {
+ SingleAddressConnectFuture fut = futs[i];
+
+ fut.cancel();
+ }
+ }
+ }
+
+ /**
+ * @param res Result.
+ */
+ void receivedAddressStatus(boolean res) {
+ if (res) {
+ for (;;) {
+ int resCnt0 = resCnt;
+
+ if (resCnt0 == Integer.MAX_VALUE)
+ return;
+
+ if (connResCntUpdater.compareAndSet(this, resCnt0, Integer.MAX_VALUE)) {
+ receivedConnectionStatus(nodeIdx, true);
+
+ cancelFutures(); // Cancel others connects if they are still in progress.
+
+ return;
+ }
+ }
+ }
+ else {
+ for (;;) {
+ int resCnt0 = resCnt;
+
+ if (resCnt0 == Integer.MAX_VALUE)
+ return;
+
+ int resCnt1 = resCnt0 + 1;
+
+ if (connResCntUpdater.compareAndSet(this, resCnt0, resCnt1)) {
+ if (resCnt1 == futs.length)
+ receivedConnectionStatus(nodeIdx, false);
+
+ return;
+ }
+ }
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/TcpCommunicationNodeConnectionCheckFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/TcpCommunicationNodeConnectionCheckFuture.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/TcpCommunicationNodeConnectionCheckFuture.java
new file mode 100644
index 0000000..cbf27b5
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/TcpCommunicationNodeConnectionCheckFuture.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.communication.tcp.internal;
+
+import java.util.UUID;
+
+/**
+ * Tcp Communication Node Connection Check Future.
+ */
+public interface TcpCommunicationNodeConnectionCheckFuture {
+ /**
+ * @param nodeId Remote node ID.
+ */
+ public void onConnected(UUID nodeId);
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiCustomMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiCustomMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiCustomMessage.java
index a0f9b75..f26ad33 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiCustomMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiCustomMessage.java
@@ -30,12 +30,23 @@ import org.jetbrains.annotations.Nullable;
*/
public interface DiscoverySpiCustomMessage extends Serializable {
/**
- * Called when message passed the ring.
+ * Called when custom message has been handled by all nodes.
+ *
+ * @return Ack message or {@code null} if ack is not required.
*/
@Nullable public DiscoverySpiCustomMessage ackMessage();
/**
- * @return {@code true} if message can be modified during listener notification. Changes will be send to next nodes.
+ * @return {@code True} if message can be modified during listener notification. Changes will be send to next nodes.
*/
public boolean isMutable();
+
+ /**
+ * Called on discovery coordinator node after listener is notified. If returns {@code true}
+ * then message is not passed to others nodes, if after this method {@link #ackMessage()} returns non-null ack
+ * message, it is sent to all nodes.
+ *
+ * @return {@code True} if message should not be sent to all nodes.
+ */
+ public boolean stopProcess();
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiMutableCustomMessageSupport.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiMutableCustomMessageSupport.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiMutableCustomMessageSupport.java
new file mode 100644
index 0000000..37aa323
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiMutableCustomMessageSupport.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.discovery;
+
+import java.lang.annotation.Documented;
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Inherited;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+/**
+ * This annotation is for all implementations of {@link DiscoverySpi} that support
+ * topology mutable {@link DiscoverySpiCustomMessage}s.
+ */
+@Documented
+@Inherited
+@Retention(RetentionPolicy.RUNTIME)
+@Target({ElementType.TYPE})
+public @interface DiscoverySpiMutableCustomMessageSupport {
+ /**
+ * @return Whether or not target SPI supports mutable {@link DiscoverySpiCustomMessage}s.
+ */
+ public boolean value();
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
index 2d9a314..f0a5186 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
@@ -55,6 +55,8 @@ import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.configuration.AddressResolver;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+import org.apache.ignite.internal.managers.discovery.IgniteDiscoverySpi;
+import org.apache.ignite.internal.managers.discovery.IgniteDiscoverySpiInternalListener;
import org.apache.ignite.internal.IgniteKernal;
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
import org.apache.ignite.internal.util.typedef.F;
@@ -88,6 +90,7 @@ import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage;
import org.apache.ignite.spi.discovery.DiscoverySpiDataExchange;
import org.apache.ignite.spi.discovery.DiscoverySpiHistorySupport;
import org.apache.ignite.spi.discovery.DiscoverySpiListener;
+import org.apache.ignite.spi.discovery.DiscoverySpiMutableCustomMessageSupport;
import org.apache.ignite.spi.discovery.DiscoverySpiNodeAuthenticator;
import org.apache.ignite.spi.discovery.DiscoverySpiOrderSupport;
import org.apache.ignite.spi.discovery.tcp.internal.DiscoveryDataPacket;
@@ -103,6 +106,7 @@ import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryAuthFailedMessag
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryCheckFailedMessage;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryDuplicateIdMessage;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryEnsureDelivery;
+import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryJoinRequestMessage;
import org.jetbrains.annotations.Nullable;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_CONSISTENT_ID_BY_HOST_WITHOUT_PORT;
@@ -223,7 +227,8 @@ import static org.apache.ignite.IgniteSystemProperties.getBoolean;
@IgniteSpiMultipleInstancesSupport(true)
@DiscoverySpiOrderSupport(true)
@DiscoverySpiHistorySupport(true)
-public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi {
+@DiscoverySpiMutableCustomMessageSupport(true)
+public class TcpDiscoverySpi extends IgniteSpiAdapter implements IgniteDiscoverySpi {
/** Node attribute that is mapped to node's external addresses (value is <tt>disc.tcp.ext-addrs</tt>). */
public static final String ATTR_EXT_ADDRS = "disc.tcp.ext-addrs";
@@ -409,6 +414,9 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi {
/** */
protected IgniteSpiContext spiCtx;
+ /** */
+ private IgniteDiscoverySpiInternalListener internalLsnr;
+
/**
* Gets current SPI state.
*
@@ -473,6 +481,13 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi {
/** {@inheritDoc} */
@Override public void sendCustomEvent(DiscoverySpiCustomMessage msg) throws IgniteException {
+ IgniteDiscoverySpiInternalListener internalLsnr = this.internalLsnr;
+
+ if (internalLsnr != null) {
+ if (!internalLsnr.beforeSendCustomEvent(this, log, msg))
+ return;
+ }
+
impl.sendCustomEvent(msg);
}
@@ -1559,6 +1574,9 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi {
OutputStream out,
TcpDiscoveryAbstractMessage msg,
long timeout) throws IOException, IgniteCheckedException {
+ if (internalLsnr != null && msg instanceof TcpDiscoveryJoinRequestMessage)
+ internalLsnr.beforeJoin(locNode, log);
+
assert sock != null;
assert msg != null;
assert out != null;
@@ -2118,15 +2136,31 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi {
return ignite().configuration().getSslContextFactory() != null;
}
- /**
- * Force reconnect to cluster.
- *
- * @throws IgniteSpiException If failed.
- */
- public void reconnect() throws IgniteSpiException {
+ /** {@inheritDoc} */
+ public void clientReconnect() throws IgniteSpiException {
impl.reconnect();
}
+ /** {@inheritDoc} */
+ @Override public boolean knownNode(UUID nodeId) {
+ return getNode0(nodeId) != null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean clientReconnectSupported() {
+ return !clientReconnectDisabled;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean supportsCommunicationFailureResolve() {
+ return false;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void resolveCommunicationFailure(ClusterNode node, Exception err) {
+ throw new UnsupportedOperationException();
+ }
+
/**
* <strong>FOR TEST ONLY!!!</strong>
*/
@@ -2148,6 +2182,11 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi {
sndMsgLsnrs.add(lsnr);
}
+ /** {@inheritDoc} */
+ @Override public void setInternalListener(IgniteDiscoverySpiInternalListener lsnr) {
+ this.internalLsnr = lsnr;
+ }
+
/**
* <strong>FOR TEST ONLY!!!</strong>
*/
@@ -2185,7 +2224,7 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi {
* <p>
* This method is intended for test purposes only.
*/
- protected void simulateNodeFailure() {
+ public void simulateNodeFailure() {
impl.simulateNodeFailure();
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNode.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNode.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNode.java
index 01534f7..55fe4e6 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNode.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNode.java
@@ -33,9 +33,9 @@ import java.util.UUID;
import org.apache.ignite.cache.CacheMetrics;
import org.apache.ignite.cluster.ClusterMetrics;
import org.apache.ignite.cluster.ClusterNode;
-import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.ClusterMetricsSnapshot;
import org.apache.ignite.internal.IgniteNodeAttributes;
+import org.apache.ignite.internal.managers.discovery.IgniteClusterNode;
import org.apache.ignite.internal.util.lang.GridMetadataAwareAdapter;
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
@@ -58,7 +58,7 @@ import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_NODE_CONSISTE
* <strong>This class is not intended for public use</strong> and has been made
* <tt>public</tt> due to certain limitations of Java technology.
*/
-public class TcpDiscoveryNode extends GridMetadataAwareAdapter implements ClusterNode,
+public class TcpDiscoveryNode extends GridMetadataAwareAdapter implements IgniteClusterNode,
Comparable<TcpDiscoveryNode>, Externalizable {
/** */
private static final long serialVersionUID = 0L;
@@ -291,26 +291,14 @@ public class TcpDiscoveryNode extends GridMetadataAwareAdapter implements Cluste
return metrics;
}
- /**
- * Sets node metrics.
- *
- * @param metrics Node metrics.
- */
+ /** {@inheritDoc} */
public void setMetrics(ClusterMetrics metrics) {
assert metrics != null;
this.metrics = metrics;
}
- /**
- * Gets collections of cache metrics for this node. Note that node cache metrics are constantly updated
- * and provide up to date information about caches.
- * <p>
- * Cache metrics are updated with some delay which is directly related to metrics update
- * frequency. For example, by default the update will happen every {@code 2} seconds.
- *
- * @return Runtime metrics snapshots for this node.
- */
+ /** {@inheritDoc} */
public Map<Integer, CacheMetrics> cacheMetrics() {
if (metricsProvider != null) {
Map<Integer, CacheMetrics> cacheMetrics0 = metricsProvider.cacheMetrics();
@@ -323,11 +311,7 @@ public class TcpDiscoveryNode extends GridMetadataAwareAdapter implements Cluste
return cacheMetrics;
}
- /**
- * Sets node cache metrics.
- *
- * @param cacheMetrics Cache metrics.
- */
+ /** {@inheritDoc} */
public void setCacheMetrics(Map<Integer, CacheMetrics> cacheMetrics) {
this.cacheMetrics = cacheMetrics != null ? cacheMetrics : Collections.<Integer, CacheMetrics>emptyMap();
}
@@ -544,11 +528,7 @@ public class TcpDiscoveryNode extends GridMetadataAwareAdapter implements Cluste
return node;
}
- /**
- * Whether this node is cache client (see {@link IgniteConfiguration#isClientMode()}).
- *
- * @return {@code True if client}.
- */
+ /** {@inheritDoc} */
public boolean isCacheClient() {
if (!cacheCliInit) {
cacheCli = CU.clientNodeDirect(this);
http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/core/src/main/resources/META-INF/classnames.properties
----------------------------------------------------------------------
diff --git a/modules/core/src/main/resources/META-INF/classnames.properties b/modules/core/src/main/resources/META-INF/classnames.properties
index f0f143d..6dc3d85 100644
--- a/modules/core/src/main/resources/META-INF/classnames.properties
+++ b/modules/core/src/main/resources/META-INF/classnames.properties
@@ -832,6 +832,7 @@ org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPar
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPreloader$1
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPreloader$2
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPreloaderAssignments
+org.apache.ignite.internal.processors.cache.distributed.dht.preloader.IgniteDhtDemandedPartitionsMap
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.IgniteDhtPartitionCountersMap
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.IgniteDhtPartitionCountersMap2
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.IgniteDhtPartitionHistorySuppliersMap
@@ -1129,6 +1130,7 @@ org.apache.ignite.internal.processors.closure.GridClosureProcessor$T8
org.apache.ignite.internal.processors.closure.GridClosureProcessor$T9
org.apache.ignite.internal.processors.closure.GridClosureProcessor$TaskNoReduceAdapter
org.apache.ignite.internal.processors.closure.GridPeerDeployAwareTaskAdapter
+org.apache.ignite.internal.processors.cluster.ClusterNodeMetrics
org.apache.ignite.internal.processors.cluster.BaselineTopology
org.apache.ignite.internal.processors.cluster.BaselineTopologyHistory
org.apache.ignite.internal.processors.cluster.BaselineTopologyHistoryItem
http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/core/src/test/java/org/apache/ignite/cache/affinity/AffinityFunctionExcludeNeighborsAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/cache/affinity/AffinityFunctionExcludeNeighborsAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/cache/affinity/AffinityFunctionExcludeNeighborsAbstractSelfTest.java
index 900d4f5..eee47c7 100644
--- a/modules/core/src/test/java/org/apache/ignite/cache/affinity/AffinityFunctionExcludeNeighborsAbstractSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/cache/affinity/AffinityFunctionExcludeNeighborsAbstractSelfTest.java
@@ -32,7 +32,6 @@ import org.apache.ignite.internal.IgniteNodeAttributes;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteProductVersion;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
-import org.apache.ignite.spi.discovery.tcp.internal.TcpDiscoveryNode;
import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
@@ -124,12 +123,9 @@ public abstract class AffinityFunctionExcludeNeighborsAbstractSelfTest extends G
Affinity<Object> aff = g.affinity(DEFAULT_CACHE_NAME);
- List<TcpDiscoveryNode> top = new ArrayList<>();
+ List<ClusterNode> top = new ArrayList<>(g.cluster().nodes());
- for (ClusterNode node : g.cluster().nodes())
- top.add((TcpDiscoveryNode) node);
-
- Collections.sort(top);
+ Collections.sort((List)top);
assertEquals(grids, top.size());
http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/core/src/test/java/org/apache/ignite/failure/FailureHandlerTriggeredTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/failure/FailureHandlerTriggeredTest.java b/modules/core/src/test/java/org/apache/ignite/failure/FailureHandlerTriggeredTest.java
index 4e4d75a..5eca7d6 100644
--- a/modules/core/src/test/java/org/apache/ignite/failure/FailureHandlerTriggeredTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/failure/FailureHandlerTriggeredTest.java
@@ -120,6 +120,10 @@ public class FailureHandlerTriggeredTest extends GridCommonAbstractTest {
@Override public boolean isMutable() {
return false;
}
+
+ @Override public boolean stopProcess() {
+ return false;
+ }
});
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/core/src/test/java/org/apache/ignite/internal/ClusterGroupHostsSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/ClusterGroupHostsSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/ClusterGroupHostsSelfTest.java
index 2328c84..141f4af 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/ClusterGroupHostsSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/ClusterGroupHostsSelfTest.java
@@ -61,6 +61,9 @@ public class ClusterGroupHostsSelfTest extends GridCommonAbstractTest {
* @throws Exception If failed.
*/
public void testForHosts() throws Exception {
+ if (!tcpDiscovery())
+ return;
+
Ignite ignite = grid();
assertEquals(1, ignite.cluster().forHost("h_1").nodes().size());
http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/core/src/test/java/org/apache/ignite/internal/ClusterGroupSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/ClusterGroupSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/ClusterGroupSelfTest.java
index 9df561a..99006d1 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/ClusterGroupSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/ClusterGroupSelfTest.java
@@ -68,6 +68,8 @@ public class ClusterGroupSelfTest extends ClusterGroupAbstractTest {
if (i == 0)
ignite = g;
}
+
+ waitForTopology(NODES_CNT);
}
finally {
Ignition.setClientMode(false);
http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/core/src/test/java/org/apache/ignite/internal/ClusterNodeMetricsUpdateTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/ClusterNodeMetricsUpdateTest.java b/modules/core/src/test/java/org/apache/ignite/internal/ClusterNodeMetricsUpdateTest.java
new file mode 100644
index 0000000..6e6b4a4
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/ClusterNodeMetricsUpdateTest.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import junit.framework.AssertionFailedError;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCompute;
+import org.apache.ignite.Ignition;
+import org.apache.ignite.cluster.ClusterGroup;
+import org.apache.ignite.cluster.ClusterMetrics;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.util.lang.GridAbsPredicate;
+import org.apache.ignite.lang.IgniteCallable;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+/**
+ *
+ */
+public class ClusterNodeMetricsUpdateTest extends GridCommonAbstractTest {
+ /** */
+ private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
+
+ /** */
+ private boolean client;
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+ ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(IP_FINDER);
+
+ cfg.setMetricsUpdateFrequency(500);
+
+ cfg.setClientMode(client);
+
+ return cfg;
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testMetrics() throws Exception {
+ int NODES = 6;
+
+ Ignite srv0 = startGridsMultiThreaded(NODES / 2);
+
+ client = true;
+
+ startGridsMultiThreaded(NODES / 2, NODES / 2);
+
+ Map<UUID, Integer> expJobs = new HashMap<>();
+
+ for (int i = 0; i < NODES; i++)
+ expJobs.put(nodeId(i), 0);
+
+ checkMetrics(NODES, expJobs);
+
+ for (int i = 0; i < NODES; i++) {
+ UUID nodeId = nodeId(i);
+
+ IgniteCompute c = srv0.compute(srv0.cluster().forNodeId(nodeId(i)));
+
+ c.call(new DummyCallable(null));
+
+ expJobs.put(nodeId, 1);
+ }
+ }
+
+ /**
+ * @param expNodes Expected nodes.
+ * @param expJobs Expected jobs number per node.
+ */
+ private void checkMetrics0(int expNodes, Map<UUID, Integer> expJobs) {
+ List<Ignite> nodes = Ignition.allGrids();
+
+ assertEquals(expNodes, nodes.size());
+ assertEquals(expNodes, expJobs.size());
+
+ int totalJobs = 0;
+
+ for (Integer c : expJobs.values())
+ totalJobs += c;
+
+ for (final Ignite ignite : nodes) {
+ ClusterMetrics m = ignite.cluster().metrics();
+
+ assertEquals(expNodes, m.getTotalNodes());
+ assertEquals(totalJobs, m.getTotalExecutedJobs());
+
+ for (Map.Entry<UUID, Integer> e : expJobs.entrySet()) {
+ UUID nodeId = e.getKey();
+
+ ClusterGroup g = ignite.cluster().forNodeId(nodeId);
+
+ ClusterMetrics nodeM = g.metrics();
+
+ assertEquals(e.getValue(), (Integer)nodeM.getTotalExecutedJobs());
+ }
+ }
+ }
+
+ /**
+ * @param expNodes Expected nodes.
+ * @param expJobs Expected jobs number per node.
+ * @throws Exception If failed.
+ */
+ private void checkMetrics(final int expNodes, final Map<UUID, Integer> expJobs) throws Exception {
+ GridTestUtils.waitForCondition(new GridAbsPredicate() {
+ @Override public boolean apply() {
+ try {
+ checkMetrics0(expNodes, expJobs);
+ }
+ catch (AssertionFailedError e) {
+ return false;
+ }
+
+ return true;
+ }
+ }, 5000);
+
+ checkMetrics0(expNodes, expJobs);
+ }
+
+ /**
+ *
+ */
+ private static class DummyCallable implements IgniteCallable<Object> {
+ /** */
+ private byte[] data;
+
+ /**
+ * @param data Data.
+ */
+ DummyCallable(byte[] data) {
+ this.data = data;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Object call() throws Exception {
+ return data;
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ super.afterTest();
+
+ stopAllGrids();
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/core/src/test/java/org/apache/ignite/internal/DiscoverySpiTestListener.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/DiscoverySpiTestListener.java b/modules/core/src/test/java/org/apache/ignite/internal/DiscoverySpiTestListener.java
new file mode 100644
index 0000000..46d9edc
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/DiscoverySpiTestListener.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
+import org.apache.ignite.internal.managers.discovery.IgniteDiscoverySpiInternalListener;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.spi.discovery.DiscoverySpi;
+import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage;
+import org.apache.ignite.testframework.GridTestUtils;
+
+/**
+ * Test callback for discovery SPI.
+ * <p>
+ * Allows block/delay node join and custom event sending.
+ */
+public class DiscoverySpiTestListener implements IgniteDiscoverySpiInternalListener {
+ /** */
+ private volatile CountDownLatch joinLatch;
+
+ /** */
+ private Set<Class<?>> blockCustomEvtCls;
+
+ /** */
+ private final Object mux = new Object();
+
+ /** */
+ private List<DiscoverySpiCustomMessage> blockedMsgs = new ArrayList<>();
+
+ /** */
+ private volatile DiscoverySpi spi;
+
+ /** */
+ private volatile IgniteLogger log;
+
+ /**
+ *
+ */
+ public void startBlockJoin() {
+ joinLatch = new CountDownLatch(1);
+ }
+
+ /**
+ *
+ */
+ public void stopBlockJoin() {
+ joinLatch.countDown();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void beforeJoin(ClusterNode locNode, IgniteLogger log) {
+ try {
+ CountDownLatch writeLatch0 = joinLatch;
+
+ if (writeLatch0 != null) {
+ log.info("Block join");
+
+ U.await(writeLatch0);
+ }
+ }
+ catch (Exception e) {
+ throw new IgniteException(e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean beforeSendCustomEvent(DiscoverySpi spi, IgniteLogger log, DiscoverySpiCustomMessage msg) {
+ this.spi = spi;
+ this.log = log;
+
+ synchronized (mux) {
+ if (blockCustomEvtCls != null) {
+ DiscoveryCustomMessage msg0 = GridTestUtils.getFieldValue(msg, "delegate");
+
+ if (blockCustomEvtCls.contains(msg0.getClass())) {
+ log.info("Block custom message: " + msg0);
+
+ blockedMsgs.add(msg);
+
+ mux.notifyAll();
+
+ return false;
+ }
+ }
+ }
+
+ return true;
+ }
+
+ /**
+ * @param blockCustomEvtCls Event class to block.
+ */
+ public void blockCustomEvent(Class<?> cls0, Class<?> ... blockCustomEvtCls) {
+ synchronized (mux) {
+ assert blockedMsgs.isEmpty() : blockedMsgs;
+
+ this.blockCustomEvtCls = new HashSet<>();
+
+ this.blockCustomEvtCls.add(cls0);
+
+ Collections.addAll(this.blockCustomEvtCls, blockCustomEvtCls);
+ }
+ }
+
+ /**
+ * @throws InterruptedException If interrupted.
+ */
+ public void waitCustomEvent() throws InterruptedException {
+ synchronized (mux) {
+ while (blockedMsgs.isEmpty())
+ mux.wait();
+ }
+ }
+
+ /**
+ *
+ */
+ public void stopBlockCustomEvents() {
+ if (spi == null)
+ return;
+
+ List<DiscoverySpiCustomMessage> msgs;
+
+ synchronized (this) {
+ msgs = new ArrayList<>(blockedMsgs);
+
+ blockCustomEvtCls = null;
+
+ blockedMsgs.clear();
+ }
+
+ for (DiscoverySpiCustomMessage msg : msgs) {
+ log.info("Resend blocked message: " + msg);
+
+ spi.sendCustomEvent(msg);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/core/src/test/java/org/apache/ignite/internal/GridDiscoverySelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/GridDiscoverySelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/GridDiscoverySelfTest.java
index e6b678b..883d677 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/GridDiscoverySelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/GridDiscoverySelfTest.java
@@ -49,6 +49,7 @@ import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.jetbrains.annotations.Nullable;
import static java.util.concurrent.TimeUnit.MINUTES;
+import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
import static org.apache.ignite.events.EventType.EVT_NODE_JOINED;
import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
import static org.apache.ignite.lang.IgniteProductVersion.fromString;
@@ -158,10 +159,10 @@ public class GridDiscoverySelfTest extends GridCommonAbstractTest {
final AtomicInteger cnt = new AtomicInteger();
- /** Joined nodes counter. */
+ // Joined nodes counter.
final CountDownLatch joinedCnt = new CountDownLatch(NODES_CNT);
- /** Left nodes counter. */
+ // Left nodes counter.
final CountDownLatch leftCnt = new CountDownLatch(NODES_CNT);
IgnitePredicate<Event> lsnr = new IgnitePredicate<Event>() {
@@ -171,7 +172,7 @@ public class GridDiscoverySelfTest extends GridCommonAbstractTest {
joinedCnt.countDown();
}
- else if (EVT_NODE_LEFT == evt.type()) {
+ else if (EVT_NODE_LEFT == evt.type() || EVT_NODE_FAILED == evt.type()) {
int i = cnt.decrementAndGet();
assert i >= 0;
@@ -185,7 +186,10 @@ public class GridDiscoverySelfTest extends GridCommonAbstractTest {
}
};
- ignite.events().localListen(lsnr, EVT_NODE_LEFT, EVT_NODE_JOINED);
+ int[] evts = tcpDiscovery() ? new int[]{EVT_NODE_LEFT, EVT_NODE_JOINED} :
+ new int[]{EVT_NODE_LEFT, EVT_NODE_FAILED, EVT_NODE_JOINED};
+
+ ignite.events().localListen(lsnr, evts);
try {
for (int i = 0; i < NODES_CNT; i++)
@@ -242,6 +246,8 @@ public class GridDiscoverySelfTest extends GridCommonAbstractTest {
for (int i = 0; i < NODES_CNT; i++)
stopGrid(i);
+ waitForTopology(1);
+
final long topVer = discoMgr.topologyVersion();
assert topVer == topVer0 + NODES_CNT * 2 : "Unexpected topology version: " + topVer;
http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/core/src/test/java/org/apache/ignite/internal/GridJobMasterLeaveAwareSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/GridJobMasterLeaveAwareSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/GridJobMasterLeaveAwareSelfTest.java
index cd6b2c0..a8be541 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/GridJobMasterLeaveAwareSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/GridJobMasterLeaveAwareSelfTest.java
@@ -259,6 +259,8 @@ public class GridJobMasterLeaveAwareSelfTest extends GridCommonAbstractTest {
// Now we stop master grid.
stopGrid(lastGridIdx, true);
+ waitForTopology(GRID_CNT - 1);
+
// Release communication SPI wait latches. As master node is stopped, job worker will receive and exception.
for (int i = 0; i < lastGridIdx; i++)
((CommunicationSpi)grid(i).configuration().getCommunicationSpi()).releaseWaitLatch();
http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/core/src/test/java/org/apache/ignite/internal/GridJobStealingSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/GridJobStealingSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/GridJobStealingSelfTest.java
index f3a19aa..6824d51 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/GridJobStealingSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/GridJobStealingSelfTest.java
@@ -187,6 +187,8 @@ public class GridJobStealingSelfTest extends GridCommonAbstractTest {
public void testProjectionPredicateInternalStealing() throws Exception {
final Ignite ignite3 = startGrid(3);
+ waitForTopology(3);
+
final UUID node1 = ignite1.cluster().localNode().id();
final UUID node3 = ignite3.cluster().localNode().id();
http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/core/src/test/java/org/apache/ignite/internal/GridSameVmStartupSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/GridSameVmStartupSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/GridSameVmStartupSelfTest.java
index 66e9cf4..a04c38e 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/GridSameVmStartupSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/GridSameVmStartupSelfTest.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal;
import java.util.Collection;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
import org.apache.ignite.Ignite;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.events.DiscoveryEvent;
@@ -75,8 +76,10 @@ public class GridSameVmStartupSelfTest extends GridCommonAbstractTest {
ignite2.events().localListen(new IgnitePredicate<Event>() {
@Override public boolean apply(Event evt) {
- assert evt.type() != EVT_NODE_FAILED :
- "Node1 did not exit gracefully.";
+ boolean tcpDiscovery = tcpDiscovery();
+
+ if (tcpDiscovery)
+ assert evt.type() != EVT_NODE_FAILED : "Node1 did not exit gracefully.";
if (evt instanceof DiscoveryEvent) {
// Local node can send METRICS_UPDATED event.
@@ -86,8 +89,14 @@ public class GridSameVmStartupSelfTest extends GridCommonAbstractTest {
((DiscoveryEvent) evt).eventNode().id() + ", expected=" + grid1LocNodeId +
", type=" + evt.type() + ']';
- if (evt.type() == EVT_NODE_LEFT)
- latch.countDown();
+ if (tcpDiscovery) {
+ if (evt.type() == EVT_NODE_LEFT)
+ latch.countDown();
+ }
+ else {
+ if (evt.type() == EVT_NODE_LEFT || evt.type() == EVT_NODE_FAILED)
+ latch.countDown();
+ }
}
return true;
@@ -96,7 +105,7 @@ public class GridSameVmStartupSelfTest extends GridCommonAbstractTest {
stopGrid(1);
- latch.await();
+ assertTrue(latch.await(10, TimeUnit.SECONDS));
Collection<ClusterNode> top2 = ignite2.cluster().forRemotes().nodes();
http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/core/src/test/java/org/apache/ignite/internal/GridSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/GridSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/GridSelfTest.java
index 7e368cb..f71ffb0 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/GridSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/GridSelfTest.java
@@ -45,6 +45,8 @@ public class GridSelfTest extends ClusterGroupAbstractTest {
for (int i = 0; i < NODES_CNT; i++)
startGrid(i);
+
+ waitForTopology(NODES_CNT);
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAbstractTest.java
index fa9cc35..e68ea13 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAbstractTest.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal;
import java.io.IOException;
import java.io.OutputStream;
import java.net.Socket;
+import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
@@ -38,6 +39,7 @@ import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.events.Event;
import org.apache.ignite.internal.managers.communication.GridIoMessage;
+import org.apache.ignite.internal.managers.discovery.IgniteDiscoverySpi;
import org.apache.ignite.internal.util.typedef.G;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteFuture;
@@ -143,6 +145,14 @@ public abstract class IgniteClientReconnectAbstractTest extends GridCommonAbstra
/**
* @param ignite Node.
+ * @return Discovery SPI.
+ */
+ protected static IgniteDiscoverySpi spi0(Ignite ignite) {
+ return ((IgniteDiscoverySpi)ignite.configuration().getDiscoverySpi());
+ }
+
+ /**
+ * @param ignite Node.
* @return Communication SPI.
*/
protected BlockTcpCommunicationSpi commSpi(Ignite ignite) {
@@ -185,16 +195,28 @@ public abstract class IgniteClientReconnectAbstractTest extends GridCommonAbstra
* @return Server node client connected to.
*/
protected Ignite clientRouter(Ignite client) {
- TcpDiscoveryNode node = (TcpDiscoveryNode)client.cluster().localNode();
+ if (tcpDiscovery()) {
+ TcpDiscoveryNode node = (TcpDiscoveryNode)client.cluster().localNode();
+
+ assertTrue(node.isClient());
+ assertNotNull(node.clientRouterNodeId());
- assertTrue(node.isClient());
- assertNotNull(node.clientRouterNodeId());
+ Ignite srv = G.ignite(node.clientRouterNodeId());
- Ignite srv = G.ignite(node.clientRouterNodeId());
+ assertNotNull(srv);
+
+ return srv;
+ }
+ else {
+ for (Ignite node : G.allGrids()) {
+ if (!node.cluster().localNode().isClient())
+ return node;
+ }
- assertNotNull(srv);
+ fail();
- return srv;
+ return null;
+ }
}
/**
@@ -251,15 +273,24 @@ public abstract class IgniteClientReconnectAbstractTest extends GridCommonAbstra
List<Ignite> clients, Ignite srv,
@Nullable Runnable disconnectedC)
throws Exception {
- final TestTcpDiscoverySpi srvSpi = spi(srv);
+ final IgniteDiscoverySpi srvSpi = spi0(srv);
final CountDownLatch disconnectLatch = new CountDownLatch(clients.size());
final CountDownLatch reconnectLatch = new CountDownLatch(clients.size());
log.info("Block reconnect.");
- for (Ignite client : clients)
- spi(client).writeLatch = new CountDownLatch(1);
+ List<DiscoverySpiTestListener> blockLsnrs = new ArrayList<>();
+
+ for (Ignite client : clients) {
+ DiscoverySpiTestListener lsnr = new DiscoverySpiTestListener();
+
+ lsnr.startBlockJoin();
+
+ blockLsnrs.add(lsnr);
+
+ spi0(client).setInternalListener(lsnr);
+ }
IgnitePredicate<Event> p = new IgnitePredicate<Event>() {
@Override public boolean apply(Event evt) {
@@ -291,8 +322,8 @@ public abstract class IgniteClientReconnectAbstractTest extends GridCommonAbstra
log.info("Allow reconnect.");
- for (Ignite client : clients)
- spi(client).writeLatch.countDown();
+ for (DiscoverySpiTestListener blockLsnr : blockLsnrs)
+ blockLsnr.stopBlockJoin();
waitReconnectEvent(log, reconnectLatch);
http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectApiExceptionTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectApiExceptionTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectApiExceptionTest.java
index 06bde99..43da2d1 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectApiExceptionTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectApiExceptionTest.java
@@ -44,6 +44,7 @@ import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.CollectionConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.events.Event;
+import org.apache.ignite.internal.managers.discovery.IgniteDiscoverySpi;
import org.apache.ignite.internal.util.typedef.C1;
import org.apache.ignite.internal.util.typedef.T2;
import org.apache.ignite.internal.util.typedef.internal.U;
@@ -51,6 +52,7 @@ import org.apache.ignite.lang.IgniteBiPredicate;
import org.apache.ignite.lang.IgniteCallable;
import org.apache.ignite.lang.IgnitePredicate;
import org.apache.ignite.resources.IgniteInstanceResource;
+import org.apache.ignite.spi.discovery.DiscoverySpi;
import org.apache.ignite.testframework.GridTestUtils;
import static java.util.concurrent.TimeUnit.SECONDS;
@@ -99,7 +101,7 @@ public class IgniteClientReconnectApiExceptionTest extends IgniteClientReconnect
* @throws Exception If failed.
*/
@SuppressWarnings("unchecked")
- public void dataStructureOperationsTest() throws Exception {
+ private void dataStructureOperationsTest() throws Exception {
clientMode = true;
final Ignite client = startGrid(serverCount());
@@ -219,7 +221,7 @@ public class IgniteClientReconnectApiExceptionTest extends IgniteClientReconnect
* @throws Exception If failed.
*/
@SuppressWarnings("unchecked")
- public void cacheOperationsTest() throws Exception {
+ private void cacheOperationsTest() throws Exception {
clientMode = true;
final Ignite client = startGrid(serverCount());
@@ -537,7 +539,7 @@ public class IgniteClientReconnectApiExceptionTest extends IgniteClientReconnect
* @throws Exception If failed.
*/
@SuppressWarnings("unchecked")
- public void igniteOperationsTest() throws Exception {
+ private void igniteOperationsTest() throws Exception {
clientMode = true;
final Ignite client = startGrid(serverCount());
@@ -775,11 +777,11 @@ public class IgniteClientReconnectApiExceptionTest extends IgniteClientReconnect
throws Exception {
assertNotNull(client.cache(DEFAULT_CACHE_NAME));
- final TestTcpDiscoverySpi clientSpi = spi(client);
+ final IgniteDiscoverySpi clientSpi = spi0(client);
Ignite srv = clientRouter(client);
- TestTcpDiscoverySpi srvSpi = spi(srv);
+ DiscoverySpi srvSpi = spi0(srv);
final CountDownLatch disconnectLatch = new CountDownLatch(1);
@@ -787,7 +789,10 @@ public class IgniteClientReconnectApiExceptionTest extends IgniteClientReconnect
log.info("Block reconnect.");
- clientSpi.writeLatch = new CountDownLatch(1);
+ DiscoverySpiTestListener lsnr = new DiscoverySpiTestListener();
+
+ clientSpi.setInternalListener(lsnr);
+ lsnr.startBlockJoin();
final List<IgniteInternalFuture> futs = new ArrayList<>();
@@ -832,7 +837,7 @@ public class IgniteClientReconnectApiExceptionTest extends IgniteClientReconnect
log.info("Allow reconnect.");
- clientSpi.writeLatch.countDown();
+ lsnr.stopBlockJoin();
waitReconnectEvent(reconnectLatch);
@@ -857,7 +862,7 @@ public class IgniteClientReconnectApiExceptionTest extends IgniteClientReconnect
}
}
finally {
- clientSpi.writeLatch.countDown();
+ lsnr.stopBlockJoin();
for (IgniteInternalFuture fut : futs)
fut.cancel();
http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAtomicsTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAtomicsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAtomicsTest.java
index 00daf5f..d1e3ade 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAtomicsTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAtomicsTest.java
@@ -111,7 +111,7 @@ public class IgniteClientReconnectAtomicsTest extends IgniteClientReconnectAbstr
assertTrue(client.cluster().localNode().isClient());
- Ignite srv = clientRouter(client);
+ Ignite srv = ignite(0);
IgniteAtomicSequence clientAtomicSeq = client.atomicSequence("atomicSeq", 0, true);
@@ -144,7 +144,7 @@ public class IgniteClientReconnectAtomicsTest extends IgniteClientReconnectAbstr
assertTrue(client.cluster().localNode().isClient());
- Ignite srv = clientRouter(client);
+ Ignite srv = ignite(0);
final IgniteAtomicSequence clientAtomicSeq = client.atomicSequence("atomicSeqRmv", 0, true);
@@ -192,7 +192,7 @@ public class IgniteClientReconnectAtomicsTest extends IgniteClientReconnectAbstr
assertTrue(client.cluster().localNode().isClient());
- Ignite srv = clientRouter(client);
+ Ignite srv = ignite(0);
BlockTcpCommunicationSpi commSpi = commSpi(srv);
@@ -253,7 +253,7 @@ public class IgniteClientReconnectAtomicsTest extends IgniteClientReconnectAbstr
assertTrue(client.cluster().localNode().isClient());
- Ignite srv = clientRouter(client);
+ Ignite srv = ignite(0);
IgniteAtomicReference<String> clientAtomicRef = client.atomicReference("atomicRef", "1st value", true);
@@ -294,7 +294,7 @@ public class IgniteClientReconnectAtomicsTest extends IgniteClientReconnectAbstr
assertTrue(client.cluster().localNode().isClient());
- Ignite srv = clientRouter(client);
+ Ignite srv = ignite(0);
final IgniteAtomicReference<String> clientAtomicRef =
client.atomicReference("atomicRefRemoved", "1st value", true);
@@ -347,7 +347,7 @@ public class IgniteClientReconnectAtomicsTest extends IgniteClientReconnectAbstr
assertTrue(client.cluster().localNode().isClient());
- Ignite srv = clientRouter(client);
+ Ignite srv = ignite(0);
final IgniteAtomicReference<String> clientAtomicRef =
client.atomicReference("atomicRefInProg", "1st value", true);
@@ -414,7 +414,7 @@ public class IgniteClientReconnectAtomicsTest extends IgniteClientReconnectAbstr
assertTrue(client.cluster().localNode().isClient());
- Ignite srv = clientRouter(client);
+ Ignite srv = ignite(0);
IgniteAtomicStamped clientAtomicStamped = client.atomicStamped("atomicStamped", 0, 0, true);
@@ -455,7 +455,7 @@ public class IgniteClientReconnectAtomicsTest extends IgniteClientReconnectAbstr
assertTrue(client.cluster().localNode().isClient());
- Ignite srv = clientRouter(client);
+ Ignite srv = ignite(0);
final IgniteAtomicStamped clientAtomicStamped = client.atomicStamped("atomicStampedRemoved", 0, 0, true);
@@ -506,7 +506,7 @@ public class IgniteClientReconnectAtomicsTest extends IgniteClientReconnectAbstr
assertTrue(client.cluster().localNode().isClient());
- Ignite srv = clientRouter(client);
+ Ignite srv = ignite(0);
final IgniteAtomicStamped clientAtomicStamped = client.atomicStamped("atomicStampedInProgress", 0, 0, true);
@@ -574,7 +574,7 @@ public class IgniteClientReconnectAtomicsTest extends IgniteClientReconnectAbstr
assertTrue(client.cluster().localNode().isClient());
- Ignite srv = clientRouter(client);
+ Ignite srv = ignite(0);
IgniteAtomicLong clientAtomicLong = client.atomicLong("atomicLong", 0, true);
@@ -605,7 +605,7 @@ public class IgniteClientReconnectAtomicsTest extends IgniteClientReconnectAbstr
assertTrue(client.cluster().localNode().isClient());
- Ignite srv = clientRouter(client);
+ Ignite srv = ignite(0);
final IgniteAtomicLong clientAtomicLong = client.atomicLong("atomicLongRmv", 0, true);
@@ -646,7 +646,7 @@ public class IgniteClientReconnectAtomicsTest extends IgniteClientReconnectAbstr
assertTrue(client.cluster().localNode().isClient());
- Ignite srv = clientRouter(client);
+ Ignite srv = ignite(0);
BlockTcpCommunicationSpi commSpi = commSpi(srv);
@@ -701,7 +701,7 @@ public class IgniteClientReconnectAtomicsTest extends IgniteClientReconnectAbstr
assertTrue(client.cluster().localNode().isClient());
- Ignite srv = clientRouter(client);
+ Ignite srv = ignite(0);
IgniteCountDownLatch clientLatch = client.countDownLatch("latch1", 3, false, true);
@@ -742,7 +742,7 @@ public class IgniteClientReconnectAtomicsTest extends IgniteClientReconnectAbstr
assertTrue(client.cluster().localNode().isClient());
- Ignite srv = clientRouter(client);
+ Ignite srv = ignite(0);
IgniteSemaphore clientSemaphore = client.semaphore("semaphore1", 3, false, true);
@@ -789,7 +789,7 @@ public class IgniteClientReconnectAtomicsTest extends IgniteClientReconnectAbstr
assertTrue(client.cluster().localNode().isClient());
- Ignite srv = clientRouter(client);
+ Ignite srv = ignite(0);
IgniteLock clientLock = client.reentrantLock("lock1", true, fair, true);
http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java
index 518e674..3cb82e0 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java
@@ -49,6 +49,7 @@ import org.apache.ignite.events.DiscoveryEvent;
import org.apache.ignite.events.Event;
import org.apache.ignite.internal.managers.communication.GridIoMessage;
import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager;
+import org.apache.ignite.internal.managers.discovery.IgniteDiscoverySpi;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicUpdateResponse;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsFullMessage;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearGetResponse;
@@ -67,6 +68,7 @@ import org.apache.ignite.plugin.extensions.communication.Message;
import org.apache.ignite.resources.LoggerResource;
import org.apache.ignite.spi.IgniteSpiException;
import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
+import org.apache.ignite.spi.discovery.DiscoverySpi;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.transactions.Transaction;
@@ -155,11 +157,11 @@ public class IgniteClientReconnectCacheTest extends IgniteClientReconnectAbstrac
IgniteEx client = startGrid(SRV_CNT);
- final TestTcpDiscoverySpi clientSpi = spi(client);
+ final IgniteDiscoverySpi clientSpi = spi0(client);
- Ignite srv = clientRouter(client);
+ Ignite srv = ignite(0);
- TestTcpDiscoverySpi srvSpi = spi(srv);
+ DiscoverySpi srvSpi = ignite(0).configuration().getDiscoverySpi();
final IgniteCache<Object, Object> cache = client.getOrCreateCache(new CacheConfiguration<>(DEFAULT_CACHE_NAME));
@@ -188,7 +190,11 @@ public class IgniteClientReconnectCacheTest extends IgniteClientReconnectAbstrac
log.info("Block reconnect.");
- clientSpi.writeLatch = new CountDownLatch(1);
+ DiscoverySpiTestListener lsnr = new DiscoverySpiTestListener();
+
+ clientSpi.setInternalListener(lsnr);
+
+ lsnr.startBlockJoin();
final AtomicReference<IgniteInternalFuture> blockPutRef = new AtomicReference<>();
@@ -254,7 +260,7 @@ public class IgniteClientReconnectCacheTest extends IgniteClientReconnectAbstrac
log.info("Allow reconnect.");
- clientSpi.writeLatch.countDown();
+ lsnr.stopBlockJoin();
assertTrue(reconnectLatch.await(5000, MILLISECONDS));
@@ -319,7 +325,7 @@ public class IgniteClientReconnectCacheTest extends IgniteClientReconnectAbstrac
IgniteEx client = startGrid(SRV_CNT);
- Ignite srv = clientRouter(client);
+ Ignite srv = ignite(0);
CacheConfiguration<Object, Object> ccfg = new CacheConfiguration<>(DEFAULT_CACHE_NAME);
@@ -412,17 +418,21 @@ public class IgniteClientReconnectCacheTest extends IgniteClientReconnectAbstrac
final TransactionConcurrency txConcurrency,
final IgniteCache<Object, Object> cache)
throws Exception {
- Ignite srv = clientRouter(client);
+ Ignite srv = ignite(0);
- final TestTcpDiscoverySpi clientSpi = spi(client);
- final TestTcpDiscoverySpi srvSpi = spi(srv);
+ final IgniteDiscoverySpi clientSpi = spi0(client);
+ final DiscoverySpi srvSpi = spi0(srv);
final CountDownLatch disconnectLatch = new CountDownLatch(1);
final CountDownLatch reconnectLatch = new CountDownLatch(1);
log.info("Block reconnect.");
- clientSpi.writeLatch = new CountDownLatch(1);
+ DiscoverySpiTestListener lsnr = new DiscoverySpiTestListener();
+
+ clientSpi.setInternalListener(lsnr);
+
+ lsnr.startBlockJoin();
client.events().localListen(new IgnitePredicate<Event>() {
@Override public boolean apply(Event evt) {
@@ -530,7 +540,7 @@ public class IgniteClientReconnectCacheTest extends IgniteClientReconnectAbstrac
assertTrue(putFailed.await(5000, MILLISECONDS));
- clientSpi.writeLatch.countDown();
+ lsnr.stopBlockJoin();
waitReconnectEvent(reconnectLatch);
@@ -604,9 +614,9 @@ public class IgniteClientReconnectCacheTest extends IgniteClientReconnectAbstrac
IgniteEx client = startGrid(SRV_CNT);
- Ignite srv = clientRouter(client);
+ Ignite srv = ignite(0);
- TestTcpDiscoverySpi srvSpi = spi(srv);
+ DiscoverySpi srvSpi = spi0(srv);
TestCommunicationSpi coordCommSpi = (TestCommunicationSpi)grid(0).configuration().getCommunicationSpi();
@@ -691,7 +701,7 @@ public class IgniteClientReconnectCacheTest extends IgniteClientReconnectAbstrac
IgniteInternalFuture<Boolean> fut = GridTestUtils.runAsync(new Callable<Boolean>() {
@Override public Boolean call() throws Exception {
try {
- Ignition.start(optimize(getConfiguration(getTestIgniteInstanceName(SRV_CNT))));
+ startGrid(optimize(getConfiguration(getTestIgniteInstanceName(SRV_CNT))));
// Commented due to IGNITE-4473, because
// IgniteClientDisconnectedException won't
@@ -722,7 +732,7 @@ public class IgniteClientReconnectCacheTest extends IgniteClientReconnectAbstrac
}
});
- TestTcpDiscoverySpi srvSpi = spi(srv);
+ DiscoverySpi srvSpi = spi0(srv);
try {
if (!joinLatch.await(10_000, MILLISECONDS)) {
@@ -1256,30 +1266,35 @@ public class IgniteClientReconnectCacheTest extends IgniteClientReconnectAbstrac
*
*/
static class TestClass1 implements Serializable {
+ // No-op.
}
/**
*
*/
static class TestClass2 implements Serializable {
+ // No-op.
}
/**
*
*/
static class TestClass3 implements Serializable {
+ // No-op.
}
/**
*
*/
static class TestClass4 implements Serializable {
+ // No-op.
}
/**
*
*/
static class TestClass5 implements Serializable {
+ // No-op.
}
/**
@@ -1294,11 +1309,11 @@ public class IgniteClientReconnectCacheTest extends IgniteClientReconnectAbstrac
Class<?> msgToBlock,
final IgniteInClosure<IgniteCache<Object, Object>> c)
throws Exception {
- Ignite srv = clientRouter(client);
+ Ignite srv = ignite(0);
final UUID id = client.localNode().id();
- TestTcpDiscoverySpi srvSpi = spi(srv);
+ DiscoverySpi srvSpi = spi0(srv);
final IgniteCache<Object, Object> cache = client.getOrCreateCache(ccfg);
http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCollectionsTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCollectionsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCollectionsTest.java
index 3f0e33d..5be59b0 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCollectionsTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCollectionsTest.java
@@ -180,7 +180,7 @@ public class IgniteClientReconnectCollectionsTest extends IgniteClientReconnectA
private void serverNodeReconnect(CollectionConfiguration colCfg) throws Exception {
final Ignite client = grid(serverCount());
- final Ignite srv = clientRouter(client);
+ final Ignite srv = ignite(0);
assertNotNull(srv.queue("q", 0, colCfg));
assertNotNull(srv.set("s", colCfg));
@@ -201,7 +201,7 @@ public class IgniteClientReconnectCollectionsTest extends IgniteClientReconnectA
assertTrue(client.cluster().localNode().isClient());
- Ignite srv = clientRouter(client);
+ Ignite srv = ignite(0);
final String setName = "set-" + colCfg.getAtomicityMode();
@@ -235,7 +235,7 @@ public class IgniteClientReconnectCollectionsTest extends IgniteClientReconnectA
assertTrue(client.cluster().localNode().isClient());
- final Ignite srv = clientRouter(client);
+ final Ignite srv = ignite(0);
final String setName = "set-rm-" + colCfg.getAtomicityMode();
@@ -281,7 +281,7 @@ public class IgniteClientReconnectCollectionsTest extends IgniteClientReconnectA
assertTrue(client.cluster().localNode().isClient());
- final Ignite srv = clientRouter(client);
+ final Ignite srv = ignite(0);
final String setName = "set-in-progress-" + colCfg.getAtomicityMode();
@@ -347,7 +347,7 @@ public class IgniteClientReconnectCollectionsTest extends IgniteClientReconnectA
assertTrue(client.cluster().localNode().isClient());
- Ignite srv = clientRouter(client);
+ Ignite srv = ignite(0);
final String setName = "queue-" + colCfg.getAtomicityMode();
@@ -379,7 +379,7 @@ public class IgniteClientReconnectCollectionsTest extends IgniteClientReconnectA
assertTrue(client.cluster().localNode().isClient());
- Ignite srv = clientRouter(client);
+ Ignite srv = ignite(0);
final String setName = "queue-rmv" + colCfg.getAtomicityMode();
@@ -423,7 +423,7 @@ public class IgniteClientReconnectCollectionsTest extends IgniteClientReconnectA
assertTrue(client.cluster().localNode().isClient());
- Ignite srv = clientRouter(client);
+ Ignite srv = ignite(0);
final String setName = "queue-rmv" + colCfg.getAtomicityMode();
http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectComputeTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectComputeTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectComputeTest.java
index cce0c7e..57d3188 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectComputeTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectComputeTest.java
@@ -49,7 +49,7 @@ public class IgniteClientReconnectComputeTest extends IgniteClientReconnectAbstr
assertTrue(client.cluster().localNode().isClient());
- Ignite srv = clientRouter(client);
+ Ignite srv = ignite(0);
IgniteCache<Integer, Integer> cache = client.getOrCreateCache("test-cache");
@@ -103,7 +103,7 @@ public class IgniteClientReconnectComputeTest extends IgniteClientReconnectAbstr
assertTrue(client.cluster().localNode().isClient());
- Ignite srv = clientRouter(client);
+ Ignite srv = ignite(0);
BlockTcpCommunicationSpi commSpi = commSpi(srv);
@@ -152,7 +152,7 @@ public class IgniteClientReconnectComputeTest extends IgniteClientReconnectAbstr
assertTrue(client.cluster().localNode().isClient());
- Ignite srv = clientRouter(client);
+ Ignite srv = ignite(0);
BlockTcpCommunicationSpi commSpi = commSpi(srv);
http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectContinuousProcessorTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectContinuousProcessorTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectContinuousProcessorTest.java
index ca0d889..d68fc1c 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectContinuousProcessorTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectContinuousProcessorTest.java
@@ -28,6 +28,7 @@ import org.apache.ignite.cache.query.QueryCursor;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.events.Event;
import org.apache.ignite.events.EventType;
+import org.apache.ignite.internal.managers.discovery.IgniteDiscoverySpi;
import org.apache.ignite.internal.util.typedef.P2;
import org.apache.ignite.lang.IgnitePredicate;
import org.apache.ignite.lang.IgniteRunnable;
@@ -61,9 +62,9 @@ public class IgniteClientReconnectContinuousProcessorTest extends IgniteClientRe
assertTrue(client.cluster().localNode().isClient());
- Ignite srv = clientRouter(client);
+ Ignite srv = ignite(0);
- TestTcpDiscoverySpi srvSpi = spi(srv);
+ IgniteDiscoverySpi srvSpi = spi0(srv);
EventListener lsnr = new EventListener();
@@ -133,9 +134,9 @@ public class IgniteClientReconnectContinuousProcessorTest extends IgniteClientRe
assertTrue(client.cluster().localNode().isClient());
- Ignite srv = clientRouter(client);
+ Ignite srv = ignite(0);
- TestTcpDiscoverySpi srvSpi = spi(srv);
+ IgniteDiscoverySpi srvSpi = spi0(srv);
final String topic = "testTopic";
@@ -309,9 +310,9 @@ public class IgniteClientReconnectContinuousProcessorTest extends IgniteClientRe
CacheEventListener lsnr)
throws Exception
{
- Ignite srv = clientRouter(client);
+ Ignite srv = ignite(0);
- TestTcpDiscoverySpi srvSpi = spi(srv);
+ IgniteDiscoverySpi srvSpi = spi0(srv);
final CountDownLatch reconnectLatch = new CountDownLatch(1);
http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectDiscoveryStateTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectDiscoveryStateTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectDiscoveryStateTest.java
index c071ee2..6e77742 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectDiscoveryStateTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectDiscoveryStateTest.java
@@ -27,6 +27,7 @@ import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.events.Event;
import org.apache.ignite.lang.IgniteFuture;
import org.apache.ignite.lang.IgnitePredicate;
+import org.apache.ignite.spi.discovery.DiscoverySpi;
import static org.apache.ignite.events.EventType.EVT_CLIENT_NODE_DISCONNECTED;
import static org.apache.ignite.events.EventType.EVT_CLIENT_NODE_RECONNECTED;
@@ -64,20 +65,23 @@ public class IgniteClientReconnectDiscoveryStateTest extends IgniteClientReconne
nodeCnt.put(1, 1);
nodeCnt.put(2, 2);
nodeCnt.put(3, 3);
- nodeCnt.put(4, 4);
- for (Map.Entry<Integer, Integer> e : nodeCnt.entrySet()) {
- Collection<ClusterNode> nodes = cluster.topology(e.getKey());
+ if (tcpDiscovery()) {
+ nodeCnt.put(4, 4);
- assertNotNull("No nodes for topology: " + e.getKey(), nodes);
- assertEquals((int)e.getValue(), nodes.size());
+ for (Map.Entry<Integer, Integer> e : nodeCnt.entrySet()) {
+ Collection<ClusterNode> nodes = cluster.topology(e.getKey());
+
+ assertNotNull("No nodes for topology: " + e.getKey(), nodes);
+ assertEquals((int)e.getValue(), nodes.size());
+ }
}
ClusterNode locNode = cluster.localNode();
assertEquals(topVer, locNode.order());
- TestTcpDiscoverySpi srvSpi = spi(clientRouter(client));
+ DiscoverySpi srvSpi = ignite(0).configuration().getDiscoverySpi();
final CountDownLatch reconnectLatch = new CountDownLatch(1);
@@ -112,7 +116,11 @@ public class IgniteClientReconnectDiscoveryStateTest extends IgniteClientReconne
assertEquals(topVer, locNode.order());
assertEquals(topVer, cluster.topologyVersion());
- nodeCnt.put(5, 3);
+ if (tcpDiscovery())
+ nodeCnt.put(5, 3);
+ else
+ nodeCnt.clear();
+
nodeCnt.put(6, 4);
for (Map.Entry<Integer, Integer> e : nodeCnt.entrySet()) {