You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by nt...@apache.org on 2015/09/07 22:46:51 UTC
[24/50] [abbrv] ignite git commit: # ignite-901 client reconnect
support
# ignite-901 client reconnect support
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/57ac2b3b
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/57ac2b3b
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/57ac2b3b
Branch: refs/heads/ignite-788-dev
Commit: 57ac2b3bf437c037904624d411fd89b28b22c944
Parents: aef4063
Author: sboikov <sb...@gridgain.com>
Authored: Thu Jul 16 13:06:04 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Thu Jul 16 13:06:05 2015 +0300
----------------------------------------------------------------------
.../IgniteClientDisconnectedException.java | 61 +
.../java/org/apache/ignite/IgniteCluster.java | 5 +
.../apache/ignite/internal/GridComponent.java | 18 +
.../ignite/internal/GridJobSiblingImpl.java | 2 +-
.../ignite/internal/GridKernalContext.java | 5 +
.../ignite/internal/GridKernalContextImpl.java | 31 +-
.../ignite/internal/GridKernalGateway.java | 46 +-
.../ignite/internal/GridKernalGatewayImpl.java | 85 +-
.../apache/ignite/internal/GridKernalState.java | 3 +
.../ignite/internal/GridPluginComponent.java | 11 +
...gniteClientDisconnectedCheckedException.java | 49 +
.../apache/ignite/internal/IgniteKernal.java | 222 +++-
.../cluster/IgniteClusterAsyncImpl.java | 5 +
.../internal/cluster/IgniteClusterImpl.java | 18 +
.../internal/managers/GridManagerAdapter.java | 19 +-
.../deployment/GridDeploymentCommunication.java | 2 +-
.../deployment/GridDeploymentManager.java | 95 +-
.../discovery/GridDiscoveryManager.java | 163 ++-
.../processors/GridProcessorAdapter.java | 11 +
.../affinity/GridAffinityAssignmentCache.java | 26 +-
.../cache/CacheOsConflictResolutionManager.java | 6 +
.../cache/DynamicCacheChangeBatch.java | 17 +
.../processors/cache/GridCacheAdapter.java | 25 +-
.../cache/GridCacheAffinityManager.java | 21 +-
.../cache/GridCacheConcurrentMap.java | 15 +-
.../processors/cache/GridCacheGateway.java | 116 +-
.../processors/cache/GridCacheIoManager.java | 8 +
.../processors/cache/GridCacheManager.java | 6 +
.../cache/GridCacheManagerAdapter.java | 6 +
.../processors/cache/GridCacheMvccManager.java | 41 +-
.../GridCachePartitionExchangeManager.java | 81 +-
.../processors/cache/GridCachePreloader.java | 5 +
.../cache/GridCachePreloaderAdapter.java | 5 +
.../processors/cache/GridCacheProcessor.java | 311 ++++-
.../cache/GridCacheSharedContext.java | 113 +-
.../cache/GridCacheSharedManager.java | 11 +-
.../cache/GridCacheSharedManagerAdapter.java | 20 +-
.../processors/cache/GridCacheUtils.java | 11 +
.../processors/cache/IgniteCacheFutureImpl.java | 5 +
.../processors/cache/IgniteCacheProxy.java | 2 +-
.../CacheDataStructuresManager.java | 35 +
.../distributed/GridCacheTxFinishSync.java | 46 +
.../distributed/dht/GridDhtCacheAdapter.java | 14 +-
.../dht/GridDhtPartitionTopologyImpl.java | 24 +
.../dht/GridPartitionedGetFuture.java | 13 +-
.../dht/preloader/GridDhtPreloader.java | 16 +-
.../distributed/near/GridNearCacheAdapter.java | 8 +
.../distributed/near/GridNearGetFuture.java | 13 +-
.../cache/dr/GridOsCacheDrManager.java | 7 +-
.../query/GridCacheDistributedQueryManager.java | 22 +
.../cache/query/GridCacheQueryAdapter.java | 11 +-
.../query/GridCacheQueryFutureAdapter.java | 2 +-
.../continuous/CacheContinuousQueryHandler.java | 5 +
.../transactions/IgniteTransactionsImpl.java | 59 +-
.../cache/transactions/IgniteTxManager.java | 19 +-
.../transactions/TransactionProxyImpl.java | 2 +-
.../cache/version/GridCacheVersionManager.java | 9 +-
.../clock/GridClockSyncProcessor.java | 6 +-
.../processors/cluster/ClusterProcessor.java | 11 +
.../continuous/GridContinuousHandler.java | 9 +-
.../continuous/GridContinuousProcessor.java | 127 +-
.../datastreamer/DataStreamProcessor.java | 24 +-
.../datastreamer/DataStreamerImpl.java | 90 +-
.../datastructures/DataStructuresProcessor.java | 33 +-
.../datastructures/GridCacheAtomicLongImpl.java | 33 +-
.../GridCacheAtomicReferenceImpl.java | 34 +-
.../GridCacheAtomicSequenceImpl.java | 33 +-
.../GridCacheAtomicStampedImpl.java | 33 +-
.../GridCacheCountDownLatchImpl.java | 51 +-
.../datastructures/GridCacheRemovable.java | 6 +-
.../datastructures/GridCacheSetImpl.java | 15 +-
.../datastructures/GridCacheSetProxy.java | 47 +-
.../processors/job/GridJobProcessor.java | 2 +-
.../internal/processors/job/GridJobWorker.java | 2 +-
.../processors/query/GridQueryIndexing.java | 7 +
.../processors/query/GridQueryProcessor.java | 6 +
.../service/GridServiceProcessor.java | 45 +-
.../processors/service/GridServiceProxy.java | 13 +-
.../processors/task/GridTaskProcessor.java | 55 +-
.../processors/task/GridTaskWorker.java | 59 +-
.../ignite/internal/util/IgniteUtils.java | 28 +
.../shmem/IpcSharedMemoryClientEndpoint.java | 5 +-
.../ignite/internal/util/lang/GridFunc.java | 2 +
.../java/org/apache/ignite/spi/IgniteSpi.java | 15 +
.../org/apache/ignite/spi/IgniteSpiAdapter.java | 37 +-
.../communication/tcp/TcpCommunicationSpi.java | 354 ++++--
.../spi/discovery/DiscoverySpiDataExchange.java | 3 +-
.../ignite/spi/discovery/tcp/ClientImpl.java | 408 ++++--
.../ignite/spi/discovery/tcp/ServerImpl.java | 134 +-
.../spi/discovery/tcp/TcpDiscoveryImpl.java | 9 +-
.../spi/discovery/tcp/TcpDiscoverySpi.java | 33 +-
.../tcp/internal/TcpDiscoveryNode.java | 19 +
.../messages/TcpDiscoveryAbstractMessage.java | 3 +
.../messages/TcpDiscoveryClientAckResponse.java | 64 +
.../messages/TcpDiscoveryHandshakeResponse.java | 14 +
.../spi/swapspace/file/FileSwapSpaceSpi.java | 2 +-
.../internal/GridUpdateNotifierSelfTest.java | 15 +-
.../IgniteClientReconnectAbstractTest.java | 363 ++++++
.../IgniteClientReconnectApiExceptionTest.java | 846 ++++++++++++
.../IgniteClientReconnectAtomicsTest.java | 672 ++++++++++
.../IgniteClientReconnectCacheTest.java | 1202 ++++++++++++++++++
.../IgniteClientReconnectCollectionsTest.java | 443 +++++++
.../IgniteClientReconnectComputeTest.java | 192 +++
...eClientReconnectContinuousProcessorTest.java | 372 ++++++
...IgniteClientReconnectDiscoveryStateTest.java | 123 ++
...niteClientReconnectFailoverAbstractTest.java | 231 ++++
.../IgniteClientReconnectFailoverTest.java | 212 +++
.../IgniteClientReconnectServicesTest.java | 260 ++++
.../internal/IgniteClientReconnectStopTest.java | 106 ++
.../IgniteClientReconnectStreamerTest.java | 233 ++++
.../IgniteSlowClientDetectionSelfTest.java | 1 +
.../GridDeploymentManagerStopSelfTest.java | 7 +
.../IgniteCacheAbstractStopBusySelfTest.java | 2 +-
.../cache/IgniteCacheDynamicStopSelfTest.java | 6 +-
.../IgniteTxExceptionAbstractSelfTest.java | 1 +
.../IgniteCacheSystemTransactionsSelfTest.java | 2 +-
.../GridCacheReplicatedInvalidateSelfTest.java | 3 +-
.../loadtests/hashmap/GridCacheTestContext.java | 4 +-
.../tcp/TcpClientDiscoverySpiSelfTest.java | 562 +++++++-
.../multijvm/IgniteClusterProcessProxy.java | 5 +
.../IgniteClientReconnectTestSuite.java | 48 +
.../processors/query/h2/IgniteH2Indexing.java | 5 +
.../query/h2/twostep/GridMergeIndex.java | 45 +-
.../h2/twostep/GridReduceQueryExecutor.java | 70 +-
...ClientReconnectCacheQueriesFailoverTest.java | 225 ++++
.../cache/IgniteClientReconnectQueriesTest.java | 427 +++++++
...dCacheAbstractReduceFieldsQuerySelfTest.java | 4 +
.../IgniteCacheWithIndexingTestSuite.java | 1 +
128 files changed, 9751 insertions(+), 815 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/57ac2b3b/modules/core/src/main/java/org/apache/ignite/IgniteClientDisconnectedException.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteClientDisconnectedException.java b/modules/core/src/main/java/org/apache/ignite/IgniteClientDisconnectedException.java
new file mode 100644
index 0000000..2089db0
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/IgniteClientDisconnectedException.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite;
+
+import org.apache.ignite.lang.*;
+import org.jetbrains.annotations.*;
+
+/**
+ * Exception thrown from Ignite API when client node disconnected from cluster.
+ */
+public class IgniteClientDisconnectedException extends IgniteException {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** */
+ private final IgniteFuture<?> reconnectFut;
+
+ /**
+ * @param reconnectFut Reconnect future.
+ * @param msg Error message.
+ */
+ public IgniteClientDisconnectedException(IgniteFuture<?> reconnectFut, String msg) {
+ this(reconnectFut, msg, null);
+ }
+
+ /**
+ * @param reconnectFut Reconnect future.
+ * @param msg Error message.
+ * @param cause Optional nested exception (can be {@code null}).
+ */
+ public IgniteClientDisconnectedException(
+ IgniteFuture<?> reconnectFut,
+ String msg,
+ @Nullable Throwable cause) {
+ super(msg, cause);
+
+ this.reconnectFut = reconnectFut;
+ }
+
+ /**
+ * @return Future that will be completed when client reconnected.
+ */
+ public IgniteFuture<?> reconnectFuture() {
+ return reconnectFut;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/57ac2b3b/modules/core/src/main/java/org/apache/ignite/IgniteCluster.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteCluster.java b/modules/core/src/main/java/org/apache/ignite/IgniteCluster.java
index 72be3fb..d3ce0e7 100644
--- a/modules/core/src/main/java/org/apache/ignite/IgniteCluster.java
+++ b/modules/core/src/main/java/org/apache/ignite/IgniteCluster.java
@@ -328,6 +328,11 @@ public interface IgniteCluster extends ClusterGroup, IgniteAsyncSupport {
*/
public void resetMetrics();
+ /**
+ * @return Future that will be completed when client reconnected.
+ */
+ @Nullable public IgniteFuture<?> clientReconnectFuture();
+
/** {@inheritDoc} */
@Override public IgniteCluster withAsync();
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/57ac2b3b/modules/core/src/main/java/org/apache/ignite/internal/GridComponent.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridComponent.java b/modules/core/src/main/java/org/apache/ignite/internal/GridComponent.java
index fb227cd..65e0644 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridComponent.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridComponent.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal;
import org.apache.ignite.*;
import org.apache.ignite.cluster.*;
+import org.apache.ignite.lang.*;
import org.apache.ignite.spi.*;
import org.jetbrains.annotations.*;
@@ -87,6 +88,7 @@ public interface GridComponent {
/**
* Receives discovery data object from remote nodes (called
* on new node during discovery process).
+ *
* @param joiningNodeId Joining node ID.
* @param rmtNodeId Remote node ID for which data is provided.
* @param data Discovery data object or {@code null} if nothing was
@@ -116,4 +118,20 @@ public interface GridComponent {
* @return Unique component type for discovery data exchange.
*/
@Nullable public DiscoveryDataExchangeType discoveryDataType();
+
+ /**
+ * Client disconnected callback.
+ *
+ * @param reconnectFut Reconnect future.
+ * @throws IgniteCheckedException If failed.
+ */
+ public void onDisconnected(IgniteFuture<?> reconnectFut) throws IgniteCheckedException;
+
+ /**
+ * Client reconnected callback.
+ *
+ * @param clusterRestarted Cluster restarted flag.
+ * @throws IgniteCheckedException If failed.
+ */
+ public void onReconnected(boolean clusterRestarted) throws IgniteCheckedException;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/57ac2b3b/modules/core/src/main/java/org/apache/ignite/internal/GridJobSiblingImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridJobSiblingImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/GridJobSiblingImpl.java
index 62adf52..b4e0f01 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridJobSiblingImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridJobSiblingImpl.java
@@ -167,7 +167,7 @@ public class GridJobSiblingImpl implements ComputeJobSibling, Externalizable {
}
catch (IgniteCheckedException e) {
// Avoid stack trace for left nodes.
- if (ctx.discovery().node(node.id()) != null && ctx.discovery().pingNode(node.id()))
+ if (ctx.discovery().node(node.id()) != null && ctx.discovery().pingNodeNoError(node.id()))
U.error(ctx.log(GridJobSiblingImpl.class), "Failed to send cancel request to node " +
"[nodeId=" + node.id() + ", ses=" + ses + ']', e);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/57ac2b3b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java
index d6542f3..f4da333 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java
@@ -557,4 +557,9 @@ public interface GridKernalContext extends Iterable<GridComponent> {
* @return {@code True} if local node is client node (has flag {@link IgniteConfiguration#isClientMode()} set).
*/
public boolean clientNode();
+
+ /**
+ * @return {@code True} if local node in disconnected state.
+ */
+ public boolean clientDisconnected();
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/57ac2b3b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java
index 8abb135..fd8b50c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java
@@ -18,6 +18,7 @@
package org.apache.ignite.internal;
import org.apache.ignite.*;
+import org.apache.ignite.cluster.*;
import org.apache.ignite.configuration.*;
import org.apache.ignite.internal.managers.checkpoint.*;
import org.apache.ignite.internal.managers.collision.*;
@@ -303,6 +304,12 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable
/** Marshaller context. */
private MarshallerContextImpl marshCtx;
+ /** */
+ private ClusterNode locNode;
+
+ /** */
+ private volatile boolean disconnected;
+
/**
* No-arg constructor is required by externalization.
*/
@@ -325,6 +332,7 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable
* @param mgmtExecSvc Management executor service.
* @param igfsExecSvc IGFS executor service.
* @param restExecSvc REST executor service.
+ * @param plugins Plugin providers.
* @throws IgniteCheckedException In case of error.
*/
@SuppressWarnings("TypeMayBeWeakened")
@@ -503,7 +511,13 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable
/** {@inheritDoc} */
@Override public UUID localNodeId() {
- return cfg.getNodeId();
+ if (locNode != null)
+ return locNode.id();
+
+ if (discoMgr != null)
+ locNode = discoMgr.localNode();
+
+ return locNode != null ? locNode.id() : config().getNodeId();
}
/** {@inheritDoc} */
@@ -903,6 +917,21 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable
}
/** {@inheritDoc} */
+ @Override public boolean clientDisconnected() {
+ if (locNode == null)
+ locNode = discoMgr != null ? discoMgr.localNode() : null;
+
+ return locNode != null ? (locNode.isClient() && disconnected) : false;
+ }
+
+ /**
+ * @param disconnected Disconnected flag.
+ */
+ void disconnected(boolean disconnected) {
+ this.disconnected = disconnected;
+ }
+
+ /** {@inheritDoc} */
@Override public String toString() {
return S.toString(GridKernalContextImpl.class, this);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/57ac2b3b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalGateway.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalGateway.java b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalGateway.java
index 0156136..1d50aa2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalGateway.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalGateway.java
@@ -17,7 +17,9 @@
package org.apache.ignite.internal;
+import org.apache.ignite.internal.util.future.*;
import org.apache.ignite.internal.util.tostring.*;
+import org.jetbrains.annotations.*;
/**
* This interface guards access to implementations of public methods that access kernal
@@ -39,22 +41,6 @@ import org.apache.ignite.internal.util.tostring.*;
@GridToStringExclude
public interface GridKernalGateway {
/**
- * Performs light-weight check on the kernal state at the moment of this call.
- * <p>
- * This method should only be used when the kernal state should be checked just once
- * at the beginning of the method and the fact that <b>kernal state can change in the middle
- * of such method's execution</b> should not matter.
- * <p>
- * For example, when a method returns a constant value its implementation doesn't depend
- * on the kernal being valid throughout its execution. In such case it is enough to check
- * the kernal's state just once at the beginning of this method to provide consistent behavior
- * of the API without incurring overhead of <code>lock-based</code> guard methods.
- *
- * @throws IllegalStateException Thrown in case when no kernal calls are allowed.
- */
- public void lightCheck() throws IllegalStateException;
-
- /**
* Should be called on entering every kernal related call
* <b>originated directly or indirectly via public API</b>.
* <p>
@@ -113,31 +99,29 @@ public interface GridKernalGateway {
public void writeUnlock();
/**
- * Adds stop listener. Note that the identity set will be used to store listeners for
- * performance reasons. Futures can register a listener to be notified when they need to
- * be internally interrupted.
+ * Gets user stack trace through the first call of grid public API.
*
- * @param lsnr Listener to add.
+ * @return User stack trace.
*/
- public void addStopListener(Runnable lsnr);
+ public String userStackTrace();
/**
- * Removes previously added stop listener.
- *
- * @param lsnr Listener to remove.
+ * @param timeout Timeout.
+ * @return {@code True} if write lock has been acquired.
+ * @throws InterruptedException If interrupted.
*/
- public void removeStopListener(Runnable lsnr);
+ public boolean tryWriteLock(long timeout) throws InterruptedException;
/**
- * Gets user stack trace through the first call of grid public API.
+ * Disconnected callback.
+ *
+ * @return Reconnect future.
*/
- public String userStackTrace();
+ @Nullable public GridFutureAdapter<?> onDisconnected();
/**
- * @param timeout Timeout.
- * @return {@code True} if write lock has been acquired.
- * @throws InterruptedException If interrupted.
+ * Reconnected callback.
*/
- public boolean tryWriteLock(long timeout) throws InterruptedException;
+ public void onReconnected();
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/57ac2b3b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalGatewayImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalGatewayImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalGatewayImpl.java
index 35bbbed..f6a9e51 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalGatewayImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalGatewayImpl.java
@@ -17,13 +17,15 @@
package org.apache.ignite.internal;
+import org.apache.ignite.*;
import org.apache.ignite.internal.util.*;
+import org.apache.ignite.internal.util.future.*;
import org.apache.ignite.internal.util.tostring.*;
import org.apache.ignite.internal.util.typedef.internal.*;
import java.io.*;
-import java.util.*;
import java.util.concurrent.*;
+import java.util.concurrent.atomic.*;
/**
*
@@ -39,10 +41,10 @@ public class GridKernalGatewayImpl implements GridKernalGateway, Serializable {
/** */
@GridToStringExclude
- private final Collection<Runnable> lsnrs = new GridSetWrapper<>(new IdentityHashMap<Runnable, Object>());
+ private IgniteFutureImpl<?> reconnectFut;
/** */
- private volatile GridKernalState state = GridKernalState.STOPPED;
+ private final AtomicReference<GridKernalState> state = new AtomicReference<>(GridKernalState.STOPPED);
/** */
@GridToStringExclude
@@ -63,12 +65,6 @@ public class GridKernalGatewayImpl implements GridKernalGateway, Serializable {
}
/** {@inheritDoc} */
- @Override public void lightCheck() throws IllegalStateException {
- if (state != GridKernalState.STARTED)
- throw illegalState();
- }
-
- /** {@inheritDoc} */
@SuppressWarnings({"LockAcquiredButNotSafelyReleased", "BusyWait"})
@Override public void readLock() throws IllegalStateException {
if (stackTrace == null)
@@ -76,10 +72,18 @@ public class GridKernalGatewayImpl implements GridKernalGateway, Serializable {
rwLock.readLock();
+ GridKernalState state = this.state.get();
+
if (state != GridKernalState.STARTED) {
// Unlock just acquired lock.
rwLock.readUnlock();
+ if (state == GridKernalState.DISCONNECTED) {
+ assert reconnectFut != null;
+
+ throw new IgniteClientDisconnectedException(reconnectFut, "Client node disconnected: " + gridName);
+ }
+
throw illegalState();
}
}
@@ -90,6 +94,9 @@ public class GridKernalGatewayImpl implements GridKernalGateway, Serializable {
stackTrace = stackTrace();
rwLock.readLock();
+
+ if (state.get() == GridKernalState.DISCONNECTED)
+ throw new IgniteClientDisconnectedException(reconnectFut, "Client node disconnected: " + gridName);
}
/** {@inheritDoc} */
@@ -137,6 +144,27 @@ public class GridKernalGatewayImpl implements GridKernalGateway, Serializable {
return false;
}
+ /** {@inheritDoc} */
+ @Override public GridFutureAdapter<?> onDisconnected() {
+ GridFutureAdapter<?> fut = new GridFutureAdapter<>();
+
+ reconnectFut = new IgniteFutureImpl<>(fut);
+
+ if (!state.compareAndSet(GridKernalState.STARTED, GridKernalState.DISCONNECTED)) {
+ ((GridFutureAdapter<?>)reconnectFut.internalFuture()).onDone(new IgniteCheckedException("Node stopped."));
+
+ return null;
+ }
+
+ return fut;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onReconnected() {
+ if (state.compareAndSet(GridKernalState.DISCONNECTED, GridKernalState.STARTED))
+ ((GridFutureAdapter<?>)reconnectFut.internalFuture()).onDone();
+ }
+
/**
* Retrieves user stack trace.
*
@@ -171,46 +199,15 @@ public class GridKernalGatewayImpl implements GridKernalGateway, Serializable {
assert state != null;
// NOTE: this method should always be called within write lock.
- this.state = state;
+ this.state.set(state);
- if (state == GridKernalState.STOPPING) {
- Runnable[] runs;
-
- synchronized (lsnrs) {
- lsnrs.toArray(runs = new Runnable[lsnrs.size()]);
- }
-
- // In the same thread.
- for (Runnable r : runs)
- r.run();
- }
+ if (reconnectFut != null)
+ ((GridFutureAdapter<?>)reconnectFut.internalFuture()).onDone(new IgniteCheckedException("Node stopped."));
}
/** {@inheritDoc} */
@Override public GridKernalState getState() {
- return state;
- }
-
- /** {@inheritDoc} */
- @Override public void addStopListener(Runnable lsnr) {
- assert lsnr != null;
-
- if (state == GridKernalState.STARTING || state == GridKernalState.STARTED)
- synchronized (lsnrs) {
- lsnrs.add(lsnr);
- }
- else
- // Call right away in the same thread.
- lsnr.run();
- }
-
- /** {@inheritDoc} */
- @Override public void removeStopListener(Runnable lsnr) {
- assert lsnr != null;
-
- synchronized (lsnrs) {
- lsnrs.remove(lsnr);
- }
+ return state.get();
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/57ac2b3b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalState.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalState.java b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalState.java
index fbb8f45..7d63578 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalState.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalState.java
@@ -32,6 +32,9 @@ public enum GridKernalState {
/** Kernal is stopping. */
STOPPING,
+ /** Kernal is disconnected. */
+ DISCONNECTED,
+
/** Kernal is stopped.
* <p>
* This is also the initial state of the kernal.
http://git-wip-us.apache.org/repos/asf/ignite/blob/57ac2b3b/modules/core/src/main/java/org/apache/ignite/internal/GridPluginComponent.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridPluginComponent.java b/modules/core/src/main/java/org/apache/ignite/internal/GridPluginComponent.java
index b438bc1..55a84c6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridPluginComponent.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridPluginComponent.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal;
import org.apache.ignite.*;
import org.apache.ignite.cluster.*;
+import org.apache.ignite.lang.*;
import org.apache.ignite.plugin.*;
import org.apache.ignite.spi.*;
import org.jetbrains.annotations.*;
@@ -64,6 +65,16 @@ public class GridPluginComponent implements GridComponent {
}
/** {@inheritDoc} */
+ @Override public void onDisconnected(IgniteFuture<?> reconnectFut) {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onReconnected(boolean clusterRestarted) {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
@Override public void onKernalStop(boolean cancel) {
plugin.onIgniteStop(cancel);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/57ac2b3b/modules/core/src/main/java/org/apache/ignite/internal/IgniteClientDisconnectedCheckedException.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteClientDisconnectedCheckedException.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteClientDisconnectedCheckedException.java
new file mode 100644
index 0000000..e58530d
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteClientDisconnectedCheckedException.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal;
+
+import org.apache.ignite.*;
+import org.apache.ignite.lang.*;
+
+/**
+ *
+ */
+public class IgniteClientDisconnectedCheckedException extends IgniteCheckedException {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** */
+ private IgniteFuture<?> reconnectFut;
+
+ /**
+ * @param reconnectFut Reconnect future.
+ * @param msg Message.
+ */
+ public IgniteClientDisconnectedCheckedException(IgniteFuture<?> reconnectFut, String msg) {
+ super(msg);
+
+ this.reconnectFut = reconnectFut;
+ }
+
+ /**
+ * @return Reconnect future.
+ */
+ public IgniteFuture<?> reconnectFuture() {
+ return reconnectFut;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/57ac2b3b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
index 024dc7b..0d4ce32 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
@@ -60,6 +60,7 @@ import org.apache.ignite.internal.processors.session.*;
import org.apache.ignite.internal.processors.task.*;
import org.apache.ignite.internal.processors.timeout.*;
import org.apache.ignite.internal.util.*;
+import org.apache.ignite.internal.util.future.*;
import org.apache.ignite.internal.util.lang.*;
import org.apache.ignite.internal.util.tostring.*;
import org.apache.ignite.internal.util.typedef.*;
@@ -902,82 +903,87 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
@Override public void run() {
if (log.isInfoEnabled()) {
- ClusterMetrics m = cluster().localNode().metrics();
+ try {
+ ClusterMetrics m = cluster().localNode().metrics();
- double cpuLoadPct = m.getCurrentCpuLoad() * 100;
- double avgCpuLoadPct = m.getAverageCpuLoad() * 100;
- double gcPct = m.getCurrentGcCpuLoad() * 100;
+ double cpuLoadPct = m.getCurrentCpuLoad() * 100;
+ double avgCpuLoadPct = m.getAverageCpuLoad() * 100;
+ double gcPct = m.getCurrentGcCpuLoad() * 100;
- long heapUsed = m.getHeapMemoryUsed();
- long heapMax = m.getHeapMemoryMaximum();
+ long heapUsed = m.getHeapMemoryUsed();
+ long heapMax = m.getHeapMemoryMaximum();
- long heapUsedInMBytes = heapUsed / 1024 / 1024;
- long heapCommInMBytes = m.getHeapMemoryCommitted() / 1024 / 1024;
+ long heapUsedInMBytes = heapUsed / 1024 / 1024;
+ long heapCommInMBytes = m.getHeapMemoryCommitted() / 1024 / 1024;
- double freeHeapPct = heapMax > 0 ? ((double)((heapMax - heapUsed) * 100)) / heapMax : -1;
+ double freeHeapPct = heapMax > 0 ? ((double)((heapMax - heapUsed) * 100)) / heapMax : -1;
- int hosts = 0;
- int nodes = 0;
- int cpus = 0;
+ int hosts = 0;
+ int nodes = 0;
+ int cpus = 0;
- try {
- ClusterMetrics metrics = cluster().metrics();
+ try {
+ ClusterMetrics metrics = cluster().metrics();
- Collection<ClusterNode> nodes0 = cluster().nodes();
+ Collection<ClusterNode> nodes0 = cluster().nodes();
- hosts = U.neighborhood(nodes0).size();
- nodes = metrics.getTotalNodes();
- cpus = metrics.getTotalCpus();
- }
- catch (IgniteException ignore) {
- // No-op.
- }
+ hosts = U.neighborhood(nodes0).size();
+ nodes = metrics.getTotalNodes();
+ cpus = metrics.getTotalCpus();
+ }
+ catch (IgniteException ignore) {
+ // No-op.
+ }
- int pubPoolActiveThreads = 0;
- int pubPoolIdleThreads = 0;
- int pubPoolQSize = 0;
+ int pubPoolActiveThreads = 0;
+ int pubPoolIdleThreads = 0;
+ int pubPoolQSize = 0;
- if (execSvc instanceof ThreadPoolExecutor) {
- ThreadPoolExecutor exec = (ThreadPoolExecutor)execSvc;
+ if (execSvc instanceof ThreadPoolExecutor) {
+ ThreadPoolExecutor exec = (ThreadPoolExecutor)execSvc;
- int poolSize = exec.getPoolSize();
+ int poolSize = exec.getPoolSize();
- pubPoolActiveThreads = Math.min(poolSize, exec.getActiveCount());
- pubPoolIdleThreads = poolSize - pubPoolActiveThreads;
- pubPoolQSize = exec.getQueue().size();
- }
+ pubPoolActiveThreads = Math.min(poolSize, exec.getActiveCount());
+ pubPoolIdleThreads = poolSize - pubPoolActiveThreads;
+ pubPoolQSize = exec.getQueue().size();
+ }
- int sysPoolActiveThreads = 0;
- int sysPoolIdleThreads = 0;
- int sysPoolQSize = 0;
+ int sysPoolActiveThreads = 0;
+ int sysPoolIdleThreads = 0;
+ int sysPoolQSize = 0;
- if (sysExecSvc instanceof ThreadPoolExecutor) {
- ThreadPoolExecutor exec = (ThreadPoolExecutor)sysExecSvc;
+ if (sysExecSvc instanceof ThreadPoolExecutor) {
+ ThreadPoolExecutor exec = (ThreadPoolExecutor)sysExecSvc;
- int poolSize = exec.getPoolSize();
+ int poolSize = exec.getPoolSize();
- sysPoolActiveThreads = Math.min(poolSize, exec.getActiveCount());
- sysPoolIdleThreads = poolSize - sysPoolActiveThreads;
- sysPoolQSize = exec.getQueue().size();
- }
+ sysPoolActiveThreads = Math.min(poolSize, exec.getActiveCount());
+ sysPoolIdleThreads = poolSize - sysPoolActiveThreads;
+ sysPoolQSize = exec.getQueue().size();
+ }
- String id = U.id8(localNode().id());
+ String id = U.id8(localNode().id());
- String msg = NL +
- "Metrics for local node (to disable set 'metricsLogFrequency' to 0)" + NL +
- " ^-- Node [id=" + id + ", name=" + name() + "]" + NL +
- " ^-- H/N/C [hosts=" + hosts + ", nodes=" + nodes + ", CPUs=" + cpus + "]" + NL +
- " ^-- CPU [cur=" + dblFmt.format(cpuLoadPct) + "%, avg=" +
+ String msg = NL +
+ "Metrics for local node (to disable set 'metricsLogFrequency' to 0)" + NL +
+ " ^-- Node [id=" + id + ", name=" + name() + "]" + NL +
+ " ^-- H/N/C [hosts=" + hosts + ", nodes=" + nodes + ", CPUs=" + cpus + "]" + NL +
+ " ^-- CPU [cur=" + dblFmt.format(cpuLoadPct) + "%, avg=" +
dblFmt.format(avgCpuLoadPct) + "%, GC=" + dblFmt.format(gcPct) + "%]" + NL +
- " ^-- Heap [used=" + dblFmt.format(heapUsedInMBytes) + "MB, free=" +
+ " ^-- Heap [used=" + dblFmt.format(heapUsedInMBytes) + "MB, free=" +
dblFmt.format(freeHeapPct) + "%, comm=" + dblFmt.format(heapCommInMBytes) + "MB]" + NL +
- " ^-- Public thread pool [active=" + pubPoolActiveThreads + ", idle=" +
+ " ^-- Public thread pool [active=" + pubPoolActiveThreads + ", idle=" +
pubPoolIdleThreads + ", qSize=" + pubPoolQSize + "]" + NL +
- " ^-- System thread pool [active=" + sysPoolActiveThreads + ", idle=" +
+ " ^-- System thread pool [active=" + sysPoolActiveThreads + ", idle=" +
sysPoolIdleThreads + ", qSize=" + sysPoolQSize + "]" + NL +
- " ^-- Outbound messages queue [size=" + m.getOutboundMessagesQueueSize() + "]";
+ " ^-- Outbound messages queue [size=" + m.getOutboundMessagesQueueSize() + "]";
- log.info(msg);
+ log.info(msg);
+ }
+ catch (IgniteClientDisconnectedException ignore) {
+ // No-op.
+ }
}
}
}, metricsLogFreq, metricsLogFreq);
@@ -1676,7 +1682,7 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
GridKernalState state = gw.getState();
- if (state == STARTED)
+ if (state == STARTED || state == DISCONNECTED)
firstStop = true;
else if (state == STARTING)
U.warn(log, "Attempt to stop starting grid. This operation " +
@@ -1753,7 +1759,7 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
if (cache != null)
cache.blockGateways();
- assert gw.getState() == STARTED || gw.getState() == STARTING;
+ assert gw.getState() == STARTED || gw.getState() == STARTING || gw.getState() == DISCONNECTED;
// No more kernal calls from this point on.
gw.setState(STOPPING);
@@ -2186,6 +2192,9 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
return false;
}
+ catch (IgniteCheckedException e) {
+ throw U.convertException(e);
+ }
finally {
unguard();
}
@@ -2801,6 +2810,109 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
}
/**
+ *
+ */
+ public void onDisconnected() {
+ Throwable err = null;
+
+ GridFutureAdapter<?> reconnectFut = ctx.gateway().onDisconnected();
+
+ if (reconnectFut == null) {
+ assert ctx.gateway().getState() != STARTED : ctx.gateway().getState();
+
+ return;
+ }
+
+ IgniteFuture<?> userFut = new IgniteFutureImpl<>(reconnectFut);
+
+ ctx.cluster().get().clientReconnectFuture(userFut);
+
+ ctx.disconnected(true);
+
+ List<GridComponent> comps = ctx.components();
+
+ for (ListIterator<GridComponent> it = comps.listIterator(comps.size()); it.hasPrevious();) {
+ GridComponent comp = it.previous();
+
+ try {
+ if (!skipDaemon(comp))
+ comp.onDisconnected(userFut);
+ }
+ catch (IgniteCheckedException e) {
+ err = e;
+ }
+ catch (Throwable e) {
+ err = e;
+
+ if (e instanceof Error)
+ throw e;
+ }
+ }
+
+ for (GridCacheContext cctx : ctx.cache().context().cacheContexts()) {
+ cctx.gate().writeLock();
+
+ cctx.gate().writeUnlock();
+ }
+
+ ctx.gateway().writeLock();
+
+ ctx.gateway().writeUnlock();
+
+ if (err != null) {
+ reconnectFut.onDone(err);
+
+ U.error(log, "Failed to reconnect, will stop node", err);
+
+ close();
+ }
+ }
+
+ /**
+ * @param clusterRestarted {@code True} if all cluster nodes restarted while client was disconnected.
+ */
+ public void onReconnected(final boolean clusterRestarted) {
+ Throwable err = null;
+
+ try {
+ ctx.disconnected(false);
+
+ for (GridComponent comp : ctx.components())
+ comp.onReconnected(clusterRestarted);
+
+ ctx.cache().context().exchange().reconnectExchangeFuture().listen(new CI1<IgniteInternalFuture<?>>() {
+ @Override public void apply(IgniteInternalFuture<?> fut) {
+ try {
+ fut.get();
+
+ ctx.gateway().onReconnected();
+ }
+ catch (IgniteCheckedException e) {
+ U.error(log, "Failed to reconnect, will stop node", e);
+
+ close();
+ }
+ }
+ });
+ }
+ catch (IgniteCheckedException e) {
+ err = e;
+ }
+ catch (Throwable e) {
+ err = e;
+
+ if (e instanceof Error)
+ throw e;
+ }
+
+ if (err != null) {
+ U.error(log, "Failed to reconnect, will stop node", err);
+
+ close();
+ }
+ }
+
+ /**
* Creates optional component.
*
* @param cls Component interface.
http://git-wip-us.apache.org/repos/asf/ignite/blob/57ac2b3b/modules/core/src/main/java/org/apache/ignite/internal/cluster/IgniteClusterAsyncImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/cluster/IgniteClusterAsyncImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/cluster/IgniteClusterAsyncImpl.java
index 26c704c..51cf523 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/cluster/IgniteClusterAsyncImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/cluster/IgniteClusterAsyncImpl.java
@@ -287,6 +287,11 @@ public class IgniteClusterAsyncImpl extends AsyncSupportAdapter<IgniteCluster>
}
/** {@inheritDoc} */
+ @Nullable @Override public IgniteFuture<?> clientReconnectFuture() {
+ return cluster.clientReconnectFuture();
+ }
+
+ /** {@inheritDoc} */
@Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
cluster = (IgniteClusterImpl)in.readObject();
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/57ac2b3b/modules/core/src/main/java/org/apache/ignite/internal/cluster/IgniteClusterImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/cluster/IgniteClusterImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/cluster/IgniteClusterImpl.java
index 3c937b0..0287ca7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/cluster/IgniteClusterImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/cluster/IgniteClusterImpl.java
@@ -52,6 +52,9 @@ public class IgniteClusterImpl extends ClusterGroupAdapter implements IgniteClus
@GridToStringExclude
private ConcurrentMap nodeLoc;
+ /** Client reconnect future. */
+ private IgniteFuture<?> reconnecFut;
+
/**
* Required by {@link Externalizable}.
*/
@@ -120,6 +123,9 @@ public class IgniteClusterImpl extends ClusterGroupAdapter implements IgniteClus
try {
return ctx.discovery().pingNode(nodeId);
}
+ catch (IgniteCheckedException e) {
+ throw U.convertException(e);
+ }
finally {
unguard();
}
@@ -501,6 +507,18 @@ public class IgniteClusterImpl extends ClusterGroupAdapter implements IgniteClus
nodeLoc.clear();
}
+ /**
+ * @param reconnecFut Reconnect future.
+ */
+ public void clientReconnectFuture(IgniteFuture<?> reconnecFut) {
+ this.reconnecFut = reconnecFut;
+ }
+
+ /** {@inheritDoc} */
+ @Nullable @Override public IgniteFuture<?> clientReconnectFuture() {
+ return reconnecFut;
+ }
+
/** {@inheritDoc} */
@Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
ctx = (GridKernalContext)in.readObject();
http://git-wip-us.apache.org/repos/asf/ignite/blob/57ac2b3b/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java
index 40a5ea5..298ff24 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java
@@ -166,6 +166,18 @@ public abstract class GridManagerAdapter<T extends IgniteSpi> implements GridMan
// No-op.
}
+ /** {@inheritDoc} */
+ @Override public void onDisconnected(IgniteFuture<?> reconnectFut) throws IgniteCheckedException {
+ for (T t : spis)
+ t.onClientDisconnected(reconnectFut);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onReconnected(boolean clusterRestarted) throws IgniteCheckedException {
+ for (T t : spis)
+ t.onClientReconnected(clusterRestarted);
+ }
+
/**
* Starts wrapped SPI.
*
@@ -318,7 +330,12 @@ public abstract class GridManagerAdapter<T extends IgniteSpi> implements GridMan
@Override public boolean pingNode(UUID nodeId) {
A.notNull(nodeId, "nodeId");
- return ctx.discovery().pingNode(nodeId);
+ try {
+ return ctx.discovery().pingNode(nodeId);
+ }
+ catch (IgniteCheckedException e) {
+ throw U.convertException(e);
+ }
}
@Override public void send(ClusterNode node, Serializable msg, String topic)
http://git-wip-us.apache.org/repos/asf/ignite/blob/57ac2b3b/modules/core/src/main/java/org/apache/ignite/internal/managers/deployment/GridDeploymentCommunication.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/deployment/GridDeploymentCommunication.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/deployment/GridDeploymentCommunication.java
index 443b221..3b886a5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/deployment/GridDeploymentCommunication.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/deployment/GridDeploymentCommunication.java
@@ -293,7 +293,7 @@ class GridDeploymentCommunication {
log.debug("Sent peer class loading response [node=" + node.id() + ", res=" + res + ']');
}
catch (IgniteCheckedException e) {
- if (ctx.discovery().pingNode(nodeId))
+ if (ctx.discovery().pingNodeNoError(nodeId))
U.error(log, "Failed to send peer class loading response to node: " + nodeId, e);
else if (log.isDebugEnabled())
log.debug("Failed to send peer class loading response to node " +
http://git-wip-us.apache.org/repos/asf/ignite/blob/57ac2b3b/modules/core/src/main/java/org/apache/ignite/internal/managers/deployment/GridDeploymentManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/deployment/GridDeploymentManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/deployment/GridDeploymentManager.java
index 75fe98f..75fb41e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/deployment/GridDeploymentManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/deployment/GridDeploymentManager.java
@@ -94,13 +94,7 @@ public class GridDeploymentManager extends GridManagerAdapter<DeploymentSpi> {
comm.start();
- locStore = new GridDeploymentLocalStore(getSpi(), ctx, comm);
- ldrStore = new GridDeploymentPerLoaderStore(getSpi(), ctx, comm);
- verStore = new GridDeploymentPerVersionStore(getSpi(), ctx, comm);
-
- locStore.start();
- ldrStore.start();
- verStore.start();
+ startStores();
if (log.isDebugEnabled()) {
log.debug("Local deployment: " + locDep);
@@ -110,17 +104,24 @@ public class GridDeploymentManager extends GridManagerAdapter<DeploymentSpi> {
}
/** {@inheritDoc} */
- @Override public void stop(boolean cancel) throws IgniteCheckedException {
- GridProtocolHandler.deregisterDeploymentManager();
+ @Override public void onDisconnected(IgniteFuture<?> reconnectFut) throws IgniteCheckedException {
+ storesOnKernalStop();
- if (verStore != null)
- verStore.stop();
+ storesStop();
- if (ldrStore != null)
- ldrStore.stop();
+ startStores();
+ }
- if (locStore != null)
- locStore.stop();
+ /** {@inheritDoc} */
+ @Override public void onReconnected(boolean clusterRestarted) throws IgniteCheckedException {
+ storesOnKernalStart();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void stop(boolean cancel) throws IgniteCheckedException {
+ GridProtocolHandler.deregisterDeploymentManager();
+
+ storesStop();
if (comm != null)
comm.stop();
@@ -135,21 +136,12 @@ public class GridDeploymentManager extends GridManagerAdapter<DeploymentSpi> {
/** {@inheritDoc} */
@Override public void onKernalStart0() throws IgniteCheckedException {
- locStore.onKernalStart();
- ldrStore.onKernalStart();
- verStore.onKernalStart();
+ storesOnKernalStart();
}
/** {@inheritDoc} */
@Override public void onKernalStop0(boolean cancel) {
- if (verStore != null)
- verStore.onKernalStop();
-
- if (ldrStore != null)
- ldrStore.onKernalStop();
-
- if (locStore != null)
- locStore.onKernalStop();
+ storesOnKernalStop();
}
/** {@inheritDoc} */
@@ -547,6 +539,57 @@ public class GridDeploymentManager extends GridManagerAdapter<DeploymentSpi> {
return ldr instanceof GridDeploymentClassLoader;
}
+
+ /**
+ * @throws IgniteCheckedException If failed.
+ */
+ private void startStores() throws IgniteCheckedException {
+ locStore = new GridDeploymentLocalStore(getSpi(), ctx, comm);
+ ldrStore = new GridDeploymentPerLoaderStore(getSpi(), ctx, comm);
+ verStore = new GridDeploymentPerVersionStore(getSpi(), ctx, comm);
+
+ locStore.start();
+ ldrStore.start();
+ verStore.start();
+ }
+
+ /**
+ * @throws IgniteCheckedException If failed.
+ */
+ private void storesOnKernalStart() throws IgniteCheckedException {
+ locStore.onKernalStart();
+ ldrStore.onKernalStart();
+ verStore.onKernalStart();
+ }
+
+ /**
+ *
+ */
+ private void storesOnKernalStop() {
+ if (verStore != null)
+ verStore.onKernalStop();
+
+ if (ldrStore != null)
+ ldrStore.onKernalStop();
+
+ if (locStore != null)
+ locStore.onKernalStop();
+ }
+
+ /**
+ *
+ */
+ private void storesStop() {
+ if (verStore != null)
+ verStore.stop();
+
+ if (ldrStore != null)
+ ldrStore.stop();
+
+ if (locStore != null)
+ locStore.stop();
+ }
+
/**
*
*/
http://git-wip-us.apache.org/repos/asf/ignite/blob/57ac2b3b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
index b35628c..068d374 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
@@ -188,14 +188,14 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
/** Received custom messages history. */
private final ArrayDeque<IgniteUuid> rcvdCustomMsgs = new ArrayDeque<>();
+ /** */
+ private final CountDownLatch startLatch = new CountDownLatch(1);
+
/** @param ctx Context. */
public GridDiscoveryManager(GridKernalContext ctx) {
super(ctx, ctx.config().getDiscoverySpi());
}
- /** */
- private final CountDownLatch startLatch = new CountDownLatch(1);
-
/**
* @return Memory usage of non-heap memory.
*/
@@ -337,7 +337,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
isLocDaemon = ctx.isDaemon();
- hasRslvrs = !F.isEmpty(ctx.config().getSegmentationResolvers());
+ hasRslvrs = !ctx.config().isClientMode() && !F.isEmpty(ctx.config().getSegmentationResolvers());
segChkFreq = ctx.config().getSegmentCheckFrequency();
@@ -380,14 +380,24 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
}
spi.setListener(new DiscoverySpiListener() {
+ private long gridStartTime;
+
@Override public void onDiscovery(
- int type,
- long topVer,
- ClusterNode node,
- Collection<ClusterNode> topSnapshot,
- Map<Long, Collection<ClusterNode>> snapshots,
+ final int type,
+ final long topVer,
+ final ClusterNode node,
+ final Collection<ClusterNode> topSnapshot,
+ final Map<Long, Collection<ClusterNode>> snapshots,
@Nullable DiscoverySpiCustomMessage spiCustomMsg
) {
+ if (type == EVT_NODE_JOINED && node.isLocal() && ctx.clientDisconnected()) {
+ discoCacheHist.clear();
+
+ topHist.clear();
+
+ topSnap.set(new Snapshot(AffinityTopologyVersion.ZERO, null));
+ }
+
DiscoveryCustomMessage customMsg = spiCustomMsg == null ? null
: ((CustomMessageWrapper)spiCustomMsg).delegate();
@@ -415,7 +425,9 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
verChanged = false;
}
else {
- if (type != EVT_NODE_SEGMENTED) {
+ if (type != EVT_NODE_SEGMENTED &&
+ type != EVT_CLIENT_NODE_DISCONNECTED &&
+ type != EVT_CLIENT_NODE_RECONNECTED) {
minorTopVer = 0;
verChanged = true;
@@ -424,7 +436,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
verChanged = false;
}
- AffinityTopologyVersion nextTopVer = new AffinityTopologyVersion(topVer, minorTopVer);
+ final AffinityTopologyVersion nextTopVer = new AffinityTopologyVersion(topVer, minorTopVer);
if (type == EVT_NODE_FAILED || type == EVT_NODE_LEFT) {
for (DiscoCache c : discoCacheHist.values())
@@ -467,11 +479,12 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
// If this is a local join event, just save it and do not notify listeners.
if (type == EVT_NODE_JOINED && node.id().equals(locNode.id())) {
+ if (gridStartTime == 0)
+ gridStartTime = getSpi().getGridStartTime();
+
updateTopologyVersionIfGreater(new AffinityTopologyVersion(locNode.order()),
new DiscoCache(localNode(), getSpi().getRemoteNodes()));
- assert startLatch.getCount() == 1;
-
startLatch.countDown();
DiscoveryEvent discoEvt = new DiscoveryEvent();
@@ -491,6 +504,46 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
return;
}
+ else if (type == EVT_CLIENT_NODE_DISCONNECTED) {
+ /*
+ * Notify all components from discovery thread to avoid concurrent
+ * reconnect while disconnect handling is in progress.
+ */
+
+ assert locNode.isClient() : locNode;
+ assert node.isClient() : node;
+
+ ((IgniteKernal)ctx.grid()).onDisconnected();
+
+ locJoinEvt = new GridFutureAdapter<>();
+
+ registeredCaches.clear();
+ }
+ else if (type == EVT_CLIENT_NODE_RECONNECTED) {
+ assert locNode.isClient() : locNode;
+ assert node.isClient() : node;
+
+ boolean clusterRestarted = gridStartTime != getSpi().getGridStartTime();
+
+ gridStartTime = getSpi().getGridStartTime();
+
+ ((IgniteKernal)ctx.grid()).onReconnected(clusterRestarted);
+
+ ctx.cluster().clientReconnectFuture().listen(new CI1<IgniteFuture<?>>() {
+ @Override public void apply(IgniteFuture<?> fut) {
+ try {
+ fut.get();
+
+ discoWrk.addEvent(type, nextTopVer, node, topSnapshot, null);
+ }
+ catch (IgniteException ignore) {
+ // No-op.
+ }
+ }
+ });
+
+ return;
+ }
discoWrk.addEvent(type, nextTopVer, node, topSnapshot, customMsg);
}
@@ -967,7 +1020,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
Collection<ClusterNode> rmtNodes = discoCache.remoteNodes();
- Collection<ClusterNode> serverNodes = F.view(discoCache.allNodes(), F.not(clientFilter));
+ Collection<ClusterNode> srvNodes = F.view(discoCache.allNodes(), F.not(clientFilter));
Collection<ClusterNode> clientNodes = F.view(discoCache.allNodes(), clientFilter);
@@ -987,7 +1040,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
double heap = U.heapSize(allNodes, 2);
if (log.isQuiet())
- U.quiet(false, topologySnapshotMessage(serverNodes.size(), clientNodes.size(), totalCpus, heap));
+ U.quiet(false, topologySnapshotMessage(srvNodes.size(), clientNodes.size(), totalCpus, heap));
if (log.isDebugEnabled()) {
String dbg = "";
@@ -997,7 +1050,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
">>> " + PREFIX + "." + U.nl() +
">>> +----------------+" + U.nl() +
">>> Grid name: " + (ctx.gridName() == null ? "default" : ctx.gridName()) + U.nl() +
- ">>> Number of server nodes: " + serverNodes.size() + U.nl() +
+ ">>> Number of server nodes: " + srvNodes.size() + U.nl() +
">>> Number of client nodes: " + clientNodes.size() + U.nl() +
(discoOrdered ? ">>> Topology version: " + topVer + U.nl() : "") +
">>> Topology hash: 0x" + Long.toHexString(hash).toUpperCase() + U.nl();
@@ -1031,20 +1084,20 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
log.debug(dbg);
}
else if (log.isInfoEnabled())
- log.info(topologySnapshotMessage(serverNodes.size(), clientNodes.size(), totalCpus, heap));
+ log.info(topologySnapshotMessage(srvNodes.size(), clientNodes.size(), totalCpus, heap));
}
/**
- * @param serverNodesNum Server nodes number.
+ * @param srvNodesNum Server nodes number.
* @param clientNodesNum Client nodes number.
* @param totalCpus Total cpu number.
* @param heap Heap size.
* @return Topology snapshot message.
*/
- private String topologySnapshotMessage(int serverNodesNum, int clientNodesNum, int totalCpus, double heap) {
+ private String topologySnapshotMessage(int srvNodesNum, int clientNodesNum, int totalCpus, double heap) {
return PREFIX + " [" +
(discoOrdered ? "ver=" + topSnap.get().topVer.topologyVersion() + ", " : "") +
- "server nodes=" + serverNodesNum +
+ "server nodes=" + srvNodesNum +
", client nodes=" + clientNodesNum +
", CPUs=" + totalCpus +
", heap=" + heap + "GB" +
@@ -1134,8 +1187,9 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
/**
* @param nodeId ID of the node.
* @return {@code True} if ping succeeded.
+ * @throws IgniteClientDisconnectedCheckedException If ping failed.
*/
- public boolean pingNode(UUID nodeId) {
+ public boolean pingNode(UUID nodeId) throws IgniteClientDisconnectedCheckedException {
assert nodeId != null;
if (!busyLock.enterBusy())
@@ -1144,6 +1198,36 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
try {
return getSpi().pingNode(nodeId);
}
+ catch (IgniteException e) {
+ if (e.hasCause(IgniteClientDisconnectedCheckedException.class)) {
+ IgniteFuture<?> reconnectFut = ctx.cluster().clientReconnectFuture();
+
+ throw new IgniteClientDisconnectedCheckedException(reconnectFut, e.getMessage());
+ }
+
+ throw e;
+ }
+ finally {
+ busyLock.leaveBusy();
+ }
+ }
+
+ /**
+ * @param nodeId ID of the node.
+ * @return {@code True} if ping succeeded.
+ */
+ public boolean pingNodeNoError(UUID nodeId) {
+ assert nodeId != null;
+
+ if (!busyLock.enterBusy())
+ return false;
+
+ try {
+ return getSpi().pingNode(nodeId);
+ }
+ catch (IgniteException e) {
+ return false;
+ }
finally {
busyLock.leaveBusy();
}
@@ -1519,9 +1603,20 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
/**
* @param msg Custom message.
+ * @throws IgniteCheckedException If failed.
*/
- public void sendCustomEvent(DiscoveryCustomMessage msg) {
- getSpi().sendCustomEvent(new CustomMessageWrapper(msg));
+ public void sendCustomEvent(DiscoveryCustomMessage msg) throws IgniteCheckedException {
+ try {
+ getSpi().sendCustomEvent(new CustomMessageWrapper(msg));
+ }
+ catch (IgniteClientDisconnectedException e) {
+ IgniteFuture<?> reconnectFut = ctx.cluster().clientReconnectFuture();
+
+ throw new IgniteClientDisconnectedCheckedException(reconnectFut, e.getMessage());
+ }
+ catch (IgniteException e) {
+ throw new IgniteCheckedException(e);
+ }
}
/**
@@ -1743,6 +1838,12 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
else if (type == EVT_NODE_SEGMENTED)
evt.message("Node segmented: " + node);
+ else if (type == EVT_CLIENT_NODE_DISCONNECTED)
+ evt.message("Client node disconnected: " + node);
+
+ else if (type == EVT_CLIENT_NODE_RECONNECTED)
+ evt.message("Client node reconnected: " + node);
+
else
assert false;
@@ -1755,6 +1856,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
* @param topVer Topology version.
* @param node Node.
* @param topSnapshot Topology snapshot.
+ * @param data Custom message.
*/
void addEvent(
int type,
@@ -1864,6 +1966,21 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
break;
}
+ case EVT_CLIENT_NODE_DISCONNECTED: {
+ // No-op.
+
+ break;
+ }
+
+ case EVT_CLIENT_NODE_RECONNECTED: {
+ if (log.isInfoEnabled())
+ log.info("Client node reconnected to topology: " + node);
+
+ ackTopology(topVer.topologyVersion(), true);
+
+ break;
+ }
+
case EVT_NODE_FAILED: {
// Check only if resolvers were configured.
if (hasRslvrs)
http://git-wip-us.apache.org/repos/asf/ignite/blob/57ac2b3b/modules/core/src/main/java/org/apache/ignite/internal/processors/GridProcessorAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/GridProcessorAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/GridProcessorAdapter.java
index a84c48a..8baf95c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/GridProcessorAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/GridProcessorAdapter.java
@@ -22,6 +22,7 @@ import org.apache.ignite.cluster.*;
import org.apache.ignite.internal.*;
import org.apache.ignite.internal.util.tostring.*;
import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.lang.*;
import org.apache.ignite.spi.*;
import org.jetbrains.annotations.*;
@@ -62,6 +63,16 @@ public abstract class GridProcessorAdapter implements GridProcessor {
}
/** {@inheritDoc} */
+ @Override public void onDisconnected(IgniteFuture<?> reconnectFut) throws IgniteCheckedException {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onReconnected(boolean clusterRestarted) throws IgniteCheckedException {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
@Nullable @Override public DiscoveryDataExchangeType discoveryDataType() {
return null;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/57ac2b3b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java
index 6989385..d40128c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java
@@ -69,7 +69,7 @@ public class GridAffinityAssignmentCache {
private IgniteLogger log;
/** Node stop flag. */
- private volatile boolean stopping;
+ private volatile IgniteCheckedException stopErr;
/**
* Constructs affinity cached calculations.
@@ -130,18 +130,28 @@ public class GridAffinityAssignmentCache {
/**
* Kernal stop callback.
+ *
+ * @param err Error.
*/
- public void onKernalStop() {
- stopping = true;
-
- IgniteCheckedException err =
- new IgniteCheckedException("Failed to wait for topology update, cache (or node) is stopping.");
+ public void onKernalStop(IgniteCheckedException err) {
+ stopErr = err;
for (AffinityReadyFuture fut : readyFuts.values())
fut.onDone(err);
}
/**
+ *
+ */
+ public void onReconnected() {
+ affCache.clear();
+
+ head.set(new GridAffinityAssignment(AffinityTopologyVersion.NONE));
+
+ stopErr = null;
+ }
+
+ /**
* Calculates affinity cache for given topology version.
*
* @param topVer Topology version to calculate affinity cache for.
@@ -312,8 +322,8 @@ public class GridAffinityAssignmentCache {
fut.onDone(topVer);
}
- else if (stopping)
- fut.onDone(new IgniteCheckedException("Failed to wait for topology update, cache (or node) is stopping."));
+ else if (stopErr != null)
+ fut.onDone(stopErr);
return fut;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/57ac2b3b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheOsConflictResolutionManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheOsConflictResolutionManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheOsConflictResolutionManager.java
index 29e50b6..9e765d9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheOsConflictResolutionManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheOsConflictResolutionManager.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.cache;
import org.apache.ignite.*;
import org.apache.ignite.internal.processors.cache.version.*;
+import org.apache.ignite.lang.*;
/**
* OS conflict resolver manager.
@@ -55,4 +56,9 @@ public class CacheOsConflictResolutionManager<K ,V> implements CacheConflictReso
@Override public void printMemoryStats() {
// No-op.
}
+
+ /** {@inheritDoc} */
+ @Override public void onDisconnected(IgniteFuture<?> reconnectFut) {
+ // No-op.
+ }
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/57ac2b3b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeBatch.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeBatch.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeBatch.java
index dfc39c1..1e8184d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeBatch.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeBatch.java
@@ -43,6 +43,9 @@ public class DynamicCacheChangeBatch implements DiscoveryCustomMessage {
/** Custom message ID. */
private IgniteUuid id = IgniteUuid.randomUuid();
+ /** */
+ private boolean clientReconnect;
+
/**
* @param reqs Requests.
*/
@@ -93,6 +96,20 @@ public class DynamicCacheChangeBatch implements DiscoveryCustomMessage {
return false;
}
+ /**
+ * @param clientReconnect {@code True} if this is discovery data sent on client reconnect.
+ */
+ public void clientReconnect(boolean clientReconnect) {
+ this.clientReconnect = clientReconnect;
+ }
+
+ /**
+ * @return {@code True} if this is discovery data sent on client reconnect.
+ */
+ public boolean clientReconnect() {
+ return clientReconnect;
+ }
+
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(DynamicCacheChangeBatch.class, this);
http://git-wip-us.apache.org/repos/asf/ignite/blob/57ac2b3b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
index e138520..d2a730a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
@@ -212,7 +212,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
*/
@SuppressWarnings("OverriddenMethodCallDuringObjectConstruction")
protected GridCacheAdapter(GridCacheContext<K, V> ctx, int startSize) {
- this(ctx, new GridCacheConcurrentMap(ctx, startSize, 0.75F));
+ this(ctx, new GridCacheConcurrentMap(ctx, startSize, 0.75F, null));
}
/**
@@ -2868,7 +2868,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
if (ctx.deploymentEnabled())
ctx.deploy().registerClass(val);
- return (GridCacheReturn)tx.removeAllAsync(ctx,
+ return tx.removeAllAsync(ctx,
Collections.singletonList(key),
null,
true,
@@ -2934,7 +2934,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
if (ctx.deploymentEnabled())
ctx.deploy().registerClass(oldVal);
- return (GridCacheReturn) tx.putAllAsync(ctx,
+ return tx.putAllAsync(ctx,
F.t(key, newVal),
true,
null,
@@ -3036,7 +3036,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
ctx.deploy().registerClass(val);
return tx.removeAllAsync(ctx, Collections.singletonList(key), null, false,
- ctx.equalsValArray(val)).get().success();
+ ctx.equalsValArray(val)).get().success();
}
@Override public String toString() {
@@ -3250,10 +3250,10 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
TransactionConfiguration cfg = ctx.gridConfig().getTransactionConfiguration();
return txStart(
- concurrency,
- isolation,
- cfg.getDefaultTxTimeout(),
- 0
+ concurrency,
+ isolation,
+ cfg.getDefaultTxTimeout(),
+ 0
);
}
@@ -3689,7 +3689,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
return F.iterator(iterator(),
new IgniteClosure<Cache.Entry<K, V>, Cache.Entry<K, V>>() {
private IgniteCacheExpiryPolicy expiryPlc =
- ctx.cache().expiryPolicy(opCtx != null ? opCtx.expiry() : null);
+ ctx.cache().expiryPolicy(opCtx != null ? opCtx.expiry() : null);
@Override public Cache.Entry<K, V> apply(Cache.Entry<K, V> lazyEntry) {
CacheOperationContext prev = ctx.gate().enter(opCtx);
@@ -4443,6 +4443,13 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
public abstract void onDeferredDelete(GridCacheEntryEx entry, GridCacheVersion ver);
/**
+ *
+ */
+ public void onReconnected() {
+ // No-op.
+ }
+
+ /**
* Validates that given cache value implements {@link Externalizable}.
*
* @param val Cache value.
http://git-wip-us.apache.org/repos/asf/ignite/blob/57ac2b3b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityManager.java
index ea17df1..88e5e40 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityManager.java
@@ -25,6 +25,7 @@ import org.apache.ignite.internal.processors.affinity.*;
import org.apache.ignite.internal.util.*;
import org.apache.ignite.internal.util.future.*;
import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.lang.*;
import org.jetbrains.annotations.*;
import java.util.*;
@@ -54,7 +55,25 @@ public class GridCacheAffinityManager extends GridCacheManagerAdapter {
/** {@inheritDoc} */
@Override protected void onKernalStop0(boolean cancel) {
- aff.onKernalStop();
+ IgniteCheckedException err =
+ new IgniteCheckedException("Failed to wait for topology update, cache (or node) is stopping.");
+
+ aff.onKernalStop(err);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onDisconnected(IgniteFuture reconnectFut) {
+ IgniteCheckedException err = new IgniteClientDisconnectedCheckedException(reconnectFut,
+ "Failed to wait for topology update, client disconnected.");
+
+ aff.onKernalStop(err);
+ }
+
+ /**
+ *
+ */
+ public void onReconnected() {
+ aff.onReconnected();
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/57ac2b3b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheConcurrentMap.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheConcurrentMap.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheConcurrentMap.java
index db5eed1..966dcc1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheConcurrentMap.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheConcurrentMap.java
@@ -279,11 +279,17 @@ public class GridCacheConcurrentMap {
* @param loadFactor the load factor threshold, used to control resizing.
* Resizing may be performed when the average number of elements per
* bin exceeds this threshold.
+ * @param factory Entries factory.
* @throws IllegalArgumentException if the initial capacity of
* elements is negative or the load factor is non-positive.
*/
- public GridCacheConcurrentMap(GridCacheContext ctx, int initCap, float loadFactor) {
+ public GridCacheConcurrentMap(GridCacheContext ctx,
+ int initCap,
+ float loadFactor,
+ @Nullable GridCacheMapEntryFactory factory) {
this(ctx, initCap, loadFactor, DFLT_CONCUR_LEVEL);
+
+ this.factory = factory;
}
/**
@@ -312,6 +318,13 @@ public class GridCacheConcurrentMap {
}
/**
+ * @return Entries factory.
+ */
+ public GridCacheMapEntryFactory getEntryFactory() {
+ return factory;
+ }
+
+ /**
* @return Non-internal predicate.
*/
private static CacheEntryPredicate[] nonInternal() {
http://git-wip-us.apache.org/repos/asf/ignite/blob/57ac2b3b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheGateway.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheGateway.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheGateway.java
index f2beb0a..a3c8da6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheGateway.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheGateway.java
@@ -22,8 +22,12 @@ import org.apache.ignite.internal.*;
import org.apache.ignite.internal.util.*;
import org.apache.ignite.internal.util.tostring.*;
import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.lang.*;
import org.jetbrains.annotations.*;
+import javax.cache.*;
+import java.util.concurrent.atomic.*;
+
/**
* Cache gateway.
*/
@@ -33,7 +37,10 @@ public class GridCacheGateway<K, V> {
private final GridCacheContext<K, V> ctx;
/** Stopped flag for dynamic caches. */
- private volatile boolean stopped;
+ private final AtomicReference<State> state = new AtomicReference<>(State.STARTED);
+
+ /** */
+ private IgniteFuture<?> reconnectFut;
/** */
private GridSpinReadWriteLock rwLock = new GridSpinReadWriteLock();
@@ -56,11 +63,36 @@ public class GridCacheGateway<K, V> {
rwLock.readLock();
- if (stopped) {
- rwLock.readUnlock();
+ checkState(true, true);
+ }
- throw new IllegalStateException("Dynamic cache has been stopped: " + ctx.name());
+ /**
+ * @param lock {@code True} if lock is held.
+ * @param stopErr {@code True} if throw exception if stopped.
+ * @return {@code True} if cache is in started state.
+ */
+ private boolean checkState(boolean lock, boolean stopErr) {
+ State state = this.state.get();
+
+ if (state != State.STARTED) {
+ if (lock)
+ rwLock.readUnlock();
+
+ if (state == State.STOPPED) {
+ if (stopErr)
+ throw new IllegalStateException("Cache has been stopped: " + ctx.name());
+ else
+ return false;
+ }
+ else {
+ assert reconnectFut != null;
+
+ throw new CacheException(
+ new IgniteClientDisconnectedException(reconnectFut, "Client node disconnected: " + ctx.gridName()));
+ }
}
+
+ return true;
}
/**
@@ -71,17 +103,11 @@ public class GridCacheGateway<K, V> {
public boolean enterIfNotStopped() {
onEnter();
- // Must unlock in case of unexpected errors to avoid
- // deadlocks during kernal stop.
+ // Must unlock in case of unexpected errors to avoid deadlocks during kernal stop.
rwLock.readLock();
- if (stopped) {
- rwLock.readUnlock();
-
- return false;
- }
+ return checkState(true, false);
- return true;
}
/**
@@ -92,7 +118,7 @@ public class GridCacheGateway<K, V> {
public boolean enterIfNotStoppedNoLock() {
onEnter();
- return !stopped;
+ return checkState(false, false);
}
/**
@@ -144,11 +170,7 @@ public class GridCacheGateway<K, V> {
rwLock.readLock();
- if (stopped) {
- rwLock.readUnlock();
-
- throw new IllegalStateException("Cache has been stopped: " + ctx.name());
- }
+ checkState(true, true);
// Must unlock in case of unexpected errors to avoid
// deadlocks during kernal stop.
@@ -169,8 +191,7 @@ public class GridCacheGateway<K, V> {
@Nullable public CacheOperationContext enterNoLock(@Nullable CacheOperationContext opCtx) {
onEnter();
- if (stopped)
- throw new IllegalStateException("Cache has been stopped: " + ctx.name());
+ checkState(false, false);
return setOperationContextPerCall(opCtx);
}
@@ -229,8 +250,42 @@ public class GridCacheGateway<K, V> {
/**
*
*/
- public void block() {
- stopped = true;
+ public void stopped() {
+ state.set(State.STOPPED);
+ }
+
+ /**
+ * @param reconnectFut Reconnect future.
+ */
+ public void onDisconnected(IgniteFuture<?> reconnectFut) {
+ assert reconnectFut != null;
+
+ this.reconnectFut = reconnectFut;
+
+ state.compareAndSet(State.STARTED, State.DISCONNECTED);
+ }
+
+ /**
+ *
+ */
+ public void writeLock(){
+ rwLock.writeLock();
+ }
+
+ /**
+ *
+ */
+ public void writeUnlock() {
+ rwLock.writeUnlock();
+ }
+
+ /**
+ * @param stopped Cache stopped flag.
+ */
+ public void reconnected(boolean stopped) {
+ State newState = stopped ? State.STOPPED : State.STARTED;
+
+ state.compareAndSet(State.DISCONNECTED, newState);
}
/**
@@ -256,11 +311,24 @@ public class GridCacheGateway<K, V> {
Thread.currentThread().interrupt();
try {
- // No-op.
- stopped = true;
+ state.set(State.STOPPED);
}
finally {
rwLock.writeUnlock();
}
}
+
+ /**
+ *
+ */
+ private enum State {
+ /** */
+ STARTED,
+
+ /** */
+ DISCONNECTED,
+
+ /** */
+ STOPPED
+ }
}