You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2014/12/12 10:30:03 UTC
[62/67] [abbrv] incubator-ignite git commit: # Merge remote-tracking
branch 'remotes/origin/master' into ignite-21
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4a735398/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
index 1911b5b,0b3330a..2c7f8be
--- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
@@@ -1758,35 -1428,22 +1758,35 @@@ public class TcpCommunicationSpi extend
GridCommunicationClient client = null;
try {
- client = reserveClient(node);
+ boolean retry;
+
+ do {
+ client = reserveClient(node);
- UUID nodeId = null;
+ UUID nodeId = null;
- if (!client.async() && !getSpiContext().localNode().version().equals(node.version()))
- nodeId = node.id();
+ if (!client.async() && !getSpiContext().localNode().version().equals(node.version()))
+ nodeId = node.id();
- client.sendMessage(nodeId, msg);
+ retry = client.sendMessage(nodeId, msg);
- client.release();
+ client.release();
- client = null;
+ client = null;
+
+ if (!retry)
+ sentMsgsCnt.increment();
+ else {
+ ClusterNode node0 = getSpiContext().node(node.id());
- sentMsgsCnt.increment();
+ if (node0 == null)
+ throw new GridException("Failed to send message to remote node " +
+ "(node has left the grid): " + node.id());
+ }
+ }
+ while (retry);
}
- catch (GridException e) {
+ catch (IgniteCheckedException e) {
throw new IgniteSpiException("Failed to send message to remote node: " + node, e);
}
finally {
@@@ -2196,19 -1812,12 +2196,19 @@@
* Performs handshake in timeout-safe way.
*
* @param client Client.
+ * @param recovery Recovery descriptor if use recovery handshake, otherwise {@code null}.
* @param rmtNodeId Remote node.
* @param timeout Timeout for handshake.
- * @throws IgniteCheckedException If handshake failed or wasn't completed withing timeout.
+ * @throws GridException If handshake failed or wasn't completed withing timeout.
+ * @return Handshake response.
*/
@SuppressWarnings("ThrowFromFinallyBlock")
- private <T> void safeHandshake(T client, UUID rmtNodeId, long timeout) throws IgniteCheckedException {
+ private <T> long safeHandshake(
+ T client,
+ @Nullable GridNioRecoveryDescriptor recovery,
+ UUID rmtNodeId,
+ long timeout
- ) throws GridException {
++ ) throws IgniteCheckedException {
HandshakeTimeoutObject<T> obj = new HandshakeTimeoutObject<>(client, U.currentTimeMillis() + timeout);
sockTimeoutWorker.addTimeoutObject(obj);
@@@ -2373,56 -1901,8 +2373,56 @@@
return S.toString(TcpCommunicationSpi.class, this);
}
+ /**
+ *
+ */
+ private static class ClientKey {
+ /** */
+ private UUID nodeId;
+
+ /** */
+ private long order;
+
+ /**
+ * @param nodeId Node ID.
+ * @param order Node order.
+ */
+ private ClientKey(UUID nodeId, long order) {
+ this.nodeId = nodeId;
+ this.order = order;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean equals(Object obj) {
+ if (this == obj)
+ return true;
+
+ if (obj == null || getClass() != obj.getClass())
+ return false;
+
+ ClientKey other = (ClientKey)obj;
+
+ return order == other.order && nodeId.equals(other.nodeId);
+
+ }
+
+ /** {@inheritDoc} */
+ @Override public int hashCode() {
+ int res = nodeId.hashCode();
+
+ res = 31 * res + (int)(order ^ (order >>> 32));
+
+ return res;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(ClientKey.class, this);
+ }
+ }
+
/** Internal exception class for proper timeout handling. */
- private static class HandshakeTimeoutException extends GridException {
+ private static class HandshakeTimeoutException extends IgniteCheckedException {
/** */
private static final long serialVersionUID = 0L;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4a735398/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiAdapter.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4a735398/modules/core/src/main/java/org/gridgain/grid/util/nio/GridCommunicationClient.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/gridgain/grid/util/nio/GridCommunicationClient.java
index 5b0db53,6404002..6c9631e
--- a/modules/core/src/main/java/org/gridgain/grid/util/nio/GridCommunicationClient.java
+++ b/modules/core/src/main/java/org/gridgain/grid/util/nio/GridCommunicationClient.java
@@@ -86,10 -86,9 +86,10 @@@ public interface GridCommunicationClien
/**
* @param nodeId Node ID (provided only if versions of local and remote nodes are different).
* @param msg Message to send.
- * @throws GridException If failed.
+ * @throws IgniteCheckedException If failed.
+ * @return {@code True} if should try to resend message.
*/
- boolean sendMessage(@Nullable UUID nodeId, GridTcpCommunicationMessageAdapter msg) throws GridException;
- void sendMessage(@Nullable UUID nodeId, GridTcpCommunicationMessageAdapter msg) throws IgniteCheckedException;
++ boolean sendMessage(@Nullable UUID nodeId, GridTcpCommunicationMessageAdapter msg) throws IgniteCheckedException;
/**
* @param timeout Timeout.
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4a735398/modules/core/src/main/java/org/gridgain/grid/util/nio/GridNioFinishedFuture.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4a735398/modules/core/src/main/java/org/gridgain/grid/util/nio/GridNioFuture.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/gridgain/grid/util/nio/GridNioFuture.java
index a3ab1ef,1754db7..19de132
--- a/modules/core/src/main/java/org/gridgain/grid/util/nio/GridNioFuture.java
+++ b/modules/core/src/main/java/org/gridgain/grid/util/nio/GridNioFuture.java
@@@ -27,12 -28,12 +28,12 @@@ public interface GridNioFuture<R>
* returns operation result.
*
* @return Operation result.
- * @throws GridInterruptedException Subclass of {@link IgniteCheckedException} thrown if the wait was interrupted.
- * @throws org.apache.ignite.lang.IgniteFutureCancelledException Subclass of {@link IgniteCheckedException} throws if operation was cancelled.
+ * @throws GridInterruptedException Subclass of {@link GridException} thrown if the wait was interrupted.
+ * @throws IgniteFutureCancelledException Subclass of {@link GridException} throws if operation was cancelled.
- * @throws GridException If operation failed.
+ * @throws IgniteCheckedException If operation failed.
* @throws IOException If IOException occurred while performing operation.
*/
- public R get() throws IOException, GridException;
+ public R get() throws IOException, IgniteCheckedException;
/**
* Synchronously waits for completion of the operation for
@@@ -41,13 -42,13 +42,13 @@@
*
* @param timeout The maximum time to wait in milliseconds.
* @return Operation result.
- * @throws GridInterruptedException Subclass of {@link IgniteCheckedException} thrown if the wait was interrupted.
- * @throws org.apache.ignite.lang.IgniteFutureTimeoutException Subclass of {@link IgniteCheckedException} thrown if the wait was timed out.
- * @throws org.apache.ignite.lang.IgniteFutureCancelledException Subclass of {@link IgniteCheckedException} throws if operation was cancelled.
+ * @throws GridInterruptedException Subclass of {@link GridException} thrown if the wait was interrupted.
+ * @throws IgniteFutureTimeoutException Subclass of {@link GridException} thrown if the wait was timed out.
+ * @throws IgniteFutureCancelledException Subclass of {@link GridException} throws if operation was cancelled.
- * @throws GridException If operation failed.
+ * @throws IgniteCheckedException If operation failed.
* @throws IOException If IOException occurred while performing operation.
*/
- public R get(long timeout) throws IOException, GridException;
+ public R get(long timeout) throws IOException, IgniteCheckedException;
/**
* Synchronously waits for completion of the operation for
@@@ -56,13 -57,13 +57,13 @@@
* @param timeout The maximum time to wait.
* @param unit The time unit of the {@code timeout} argument.
* @return Operation result.
- * @throws GridInterruptedException Subclass of {@link IgniteCheckedException} thrown if the wait was interrupted.
- * @throws org.apache.ignite.lang.IgniteFutureTimeoutException Subclass of {@link IgniteCheckedException} thrown if the wait was timed out.
- * @throws org.apache.ignite.lang.IgniteFutureCancelledException Subclass of {@link IgniteCheckedException} throws if operation was cancelled.
+ * @throws GridInterruptedException Subclass of {@link GridException} thrown if the wait was interrupted.
+ * @throws IgniteFutureTimeoutException Subclass of {@link GridException} thrown if the wait was timed out.
+ * @throws IgniteFutureCancelledException Subclass of {@link GridException} throws if operation was cancelled.
- * @throws GridException If operation failed.
+ * @throws IgniteCheckedException If operation failed.
* @throws IOException If IOException occurred while performing operation.
*/
- public R get(long timeout, TimeUnit unit) throws IOException, GridException;
+ public R get(long timeout, TimeUnit unit) throws IOException, IgniteCheckedException;
/**
* Cancels this future.
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4a735398/modules/core/src/main/java/org/gridgain/grid/util/nio/GridNioFutureImpl.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/gridgain/grid/util/nio/GridNioFutureImpl.java
index 7ab2e14,6c5a6bc..ee84796
--- a/modules/core/src/main/java/org/gridgain/grid/util/nio/GridNioFutureImpl.java
+++ b/modules/core/src/main/java/org/gridgain/grid/util/nio/GridNioFutureImpl.java
@@@ -118,10 -119,10 +119,10 @@@ public class GridNioFutureImpl<R> exten
* @param nanosTimeout Timeout (nanoseconds).
* @return Result.
* @throws InterruptedException If interrupted.
- * @throws org.apache.ignite.lang.IgniteFutureTimeoutException If timeout reached before computation completed.
+ * @throws IgniteFutureTimeoutException If timeout reached before computation completed.
- * @throws GridException If error occurred.
+ * @throws IgniteCheckedException If error occurred.
*/
- @Nullable protected R get0(long nanosTimeout) throws InterruptedException, GridException {
+ @Nullable protected R get0(long nanosTimeout) throws InterruptedException, IgniteCheckedException {
if (endTime == 0 && !tryAcquireSharedNanos(0, nanosTimeout))
throw new IgniteFutureTimeoutException("Timeout was reached before computation completed.");
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4a735398/modules/core/src/main/java/org/gridgain/grid/util/nio/GridNioServer.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4a735398/modules/core/src/main/java/org/gridgain/grid/util/nio/GridNioSessionImpl.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4a735398/modules/core/src/main/java/org/gridgain/grid/util/nio/GridShmemCommunicationClient.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/gridgain/grid/util/nio/GridShmemCommunicationClient.java
index 505c788,8777405..501e7ee
--- a/modules/core/src/main/java/org/gridgain/grid/util/nio/GridShmemCommunicationClient.java
+++ b/modules/core/src/main/java/org/gridgain/grid/util/nio/GridShmemCommunicationClient.java
@@@ -103,10 -103,10 +103,10 @@@ public class GridShmemCommunicationClie
}
/** {@inheritDoc} */
- @Override public synchronized void sendMessage(@Nullable UUID nodeId, GridTcpCommunicationMessageAdapter msg)
+ @Override public synchronized boolean sendMessage(@Nullable UUID nodeId, GridTcpCommunicationMessageAdapter msg)
- throws GridException {
+ throws IgniteCheckedException {
if (closed())
- throw new GridException("Communication client was closed: " + this);
+ throw new IgniteCheckedException("Communication client was closed: " + this);
assert writeBuf.hasArray();
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4a735398/modules/core/src/main/java/org/gridgain/grid/util/nio/GridTcpCommunicationClient.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/gridgain/grid/util/nio/GridTcpCommunicationClient.java
index fbca363,a20ea24..632ce35
--- a/modules/core/src/main/java/org/gridgain/grid/util/nio/GridTcpCommunicationClient.java
+++ b/modules/core/src/main/java/org/gridgain/grid/util/nio/GridTcpCommunicationClient.java
@@@ -182,10 -182,10 +182,10 @@@ public class GridTcpCommunicationClien
}
/** {@inheritDoc} */
- @Override public void sendMessage(@Nullable UUID nodeId, GridTcpCommunicationMessageAdapter msg)
+ @Override public boolean sendMessage(@Nullable UUID nodeId, GridTcpCommunicationMessageAdapter msg)
- throws GridException {
+ throws IgniteCheckedException {
if (closed())
- throw new GridException("Client was closed: " + this);
+ throw new IgniteCheckedException("Client was closed: " + this);
assert writeBuf.hasArray();
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4a735398/modules/core/src/main/java/org/gridgain/grid/util/nio/GridTcpNioCommunicationClient.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/gridgain/grid/util/nio/GridTcpNioCommunicationClient.java
index 55997d3,49053d3..3d8668d
--- a/modules/core/src/main/java/org/gridgain/grid/util/nio/GridTcpNioCommunicationClient.java
+++ b/modules/core/src/main/java/org/gridgain/grid/util/nio/GridTcpNioCommunicationClient.java
@@@ -96,8 -98,8 +95,8 @@@ public class GridTcpNioCommunicationCli
}
/** {@inheritDoc} */
- @Override public void sendMessage(@Nullable UUID nodeId, GridTcpCommunicationMessageAdapter msg)
+ @Override public boolean sendMessage(@Nullable UUID nodeId, GridTcpCommunicationMessageAdapter msg)
- throws GridException {
+ throws IgniteCheckedException {
// Node ID is never provided in asynchronous send mode.
assert nodeId == null;
@@@ -108,23 -113,9 +107,23 @@@
fut.get();
}
catch (IOException e) {
- throw new IgniteCheckedException("Failed to send message [client=" + this + ']', e);
+ if (log.isDebugEnabled())
+ log.debug("Failed to send message [client=" + this + ", err=" +e + ']');
+
+ return true;
+ }
- catch (GridException e) {
++ catch (IgniteCheckedException e) {
+ if (log.isDebugEnabled())
+ log.debug("Failed to send message [client=" + this + ", err=" +e + ']');
+
+ if (e.getCause() instanceof IOException)
+ return true;
+ else
- throw new GridException("Failed to send message [client=" + this + ']', e);
++ throw new IgniteCheckedException("Failed to send message [client=" + this + ']', e);
}
}
+
+ return false;
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4a735398/modules/core/src/test/java/org/apache/ignite/spi/communication/GridAbstractCommunicationSelfTest.java
----------------------------------------------------------------------
diff --cc modules/core/src/test/java/org/apache/ignite/spi/communication/GridAbstractCommunicationSelfTest.java
index 86d68a7,0dfb5fd..07b5059
--- a/modules/core/src/test/java/org/apache/ignite/spi/communication/GridAbstractCommunicationSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/GridAbstractCommunicationSelfTest.java
@@@ -9,13 -9,13 +9,13 @@@
package org.apache.ignite.spi.communication;
-import mx4j.tools.adaptor.http.*;
+ import org.apache.ignite.*;
import org.apache.ignite.cluster.*;
import org.apache.ignite.lang.*;
- import org.gridgain.grid.*;
import org.gridgain.grid.util.direct.*;
+import org.gridgain.grid.util.typedef.*;
+import org.gridgain.grid.util.typedef.internal.*;
import org.gridgain.testframework.*;
-import org.gridgain.testframework.config.*;
import org.gridgain.testframework.junits.*;
import org.gridgain.testframework.junits.spi.*;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4a735398/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiMultithreadedSelfTest.java
----------------------------------------------------------------------
diff --cc modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiMultithreadedSelfTest.java
index 0d650fb,80f2226..1889cd2
--- a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiMultithreadedSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiMultithreadedSelfTest.java
@@@ -9,12 -9,13 +9,13 @@@
package org.apache.ignite.spi.communication.tcp;
-import mx4j.tools.adaptor.http.*;
+ import org.apache.ignite.*;
import org.apache.ignite.cluster.*;
import org.apache.ignite.lang.*;
- import org.gridgain.grid.*;
import org.apache.ignite.spi.communication.*;
+ import org.gridgain.grid.*;
import org.gridgain.grid.util.direct.*;
+import org.gridgain.grid.util.lang.*;
import org.gridgain.grid.util.nio.*;
import org.gridgain.grid.util.typedef.*;
import org.gridgain.grid.util.typedef.internal.*;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4a735398/modules/core/src/test/java/org/gridgain/grid/kernal/managers/GridManagerStopSelfTest.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4a735398/modules/core/src/test/java/org/gridgain/grid/util/nio/impl/GridNioFilterChainSelfTest.java
----------------------------------------------------------------------