You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/07/01 17:11:15 UTC
[2/2] incubator-ignite git commit: # ignite-901 client reconnect WIP
# ignite-901 client reconnect WIP
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/f5f3efd1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/f5f3efd1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/f5f3efd1
Branch: refs/heads/ignite-901
Commit: f5f3efd164ae0b67917bbbbf1b856b2d24e72217
Parents: 6e23608
Author: sboikov <sb...@gridgain.com>
Authored: Mon Jun 29 16:01:17 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Wed Jul 1 18:10:57 2015 +0300
----------------------------------------------------------------------
.../apache/ignite/internal/GridComponent.java | 10 +
.../ignite/internal/GridKernalContextImpl.java | 12 +-
.../ignite/internal/GridKernalGateway.java | 43 +-
.../ignite/internal/GridKernalGatewayImpl.java | 115 ++--
.../apache/ignite/internal/GridKernalState.java | 3 +
.../ignite/internal/GridPluginComponent.java | 10 +
.../IgniteDisconnectedCheckedException.java | 32 ++
.../apache/ignite/internal/IgniteKernal.java | 103 +++-
.../ignite/internal/MarshallerContextImpl.java | 11 +-
.../internal/managers/GridManagerAdapter.java | 10 +
.../deployment/GridDeploymentManager.java | 88 ++-
.../discovery/GridDiscoveryManager.java | 31 +-
.../processors/GridProcessorAdapter.java | 10 +
.../processors/cache/GridCacheAdapter.java | 284 ++++++----
.../cache/GridCacheDeploymentManager.java | 5 +
.../processors/cache/GridCacheGateway.java | 128 ++++-
.../processors/cache/GridCacheIoManager.java | 9 +-
.../processors/cache/GridCacheMvccManager.java | 10 +-
.../GridCachePartitionExchangeManager.java | 29 +-
.../processors/cache/GridCacheProcessor.java | 50 +-
.../cache/GridCacheSharedContext.java | 55 +-
.../cache/GridCacheSharedManager.java | 7 +-
.../cache/GridCacheSharedManagerAdapter.java | 14 +-
.../GridDhtPartitionsExchangeFuture.java | 4 +
.../dht/preloader/GridDhtPreloader.java | 7 -
.../distributed/near/GridNearCacheAdapter.java | 5 +
.../processors/task/GridTaskProcessor.java | 2 +-
.../org/apache/ignite/spi/IgniteSpiAdapter.java | 9 +-
.../communication/tcp/TcpCommunicationSpi.java | 56 +-
.../ignite/spi/discovery/tcp/ClientImpl.java | 169 ++++--
.../ignite/spi/discovery/tcp/ServerImpl.java | 4 +-
.../spi/discovery/tcp/TcpDiscoveryImpl.java | 2 +-
.../spi/discovery/tcp/TcpDiscoverySpi.java | 2 +-
.../tcp/internal/TcpDiscoveryNode.java | 17 +
.../spi/swapspace/file/FileSwapSpaceSpi.java | 2 +-
.../internal/GridUpdateNotifierSelfTest.java | 14 +-
.../IgniteClientReconnectAbstractTest.java | 143 +++++
.../IgniteClientReconnectApiBlockTest.java | 157 ++++++
.../IgniteClientReconnectCacheTest.java | 562 +++++++++++++++++++
...IgniteClientReconnectDiscoveryStateTest.java | 110 ++++
.../GridCacheReplicatedInvalidateSelfTest.java | 3 +-
.../tcp/TcpClientDiscoverySpiSelfTest.java | 148 ++++-
.../IgniteClientReconnectTestSuite.java | 40 ++
43 files changed, 2162 insertions(+), 363 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f5f3efd1/modules/core/src/main/java/org/apache/ignite/internal/GridComponent.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridComponent.java b/modules/core/src/main/java/org/apache/ignite/internal/GridComponent.java
index fb227cd..5b3b0c3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridComponent.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridComponent.java
@@ -116,4 +116,14 @@ public interface GridComponent {
* @return Unique component type for discovery data exchange.
*/
@Nullable public DiscoveryDataExchangeType discoveryDataType();
+
+ /**
+ *
+ */
+ public void onDisconnected() throws IgniteCheckedException;
+
+ /**
+ *
+ */
+ public void onReconnected() throws IgniteCheckedException;
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f5f3efd1/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java
index 65107a7..581c891 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java
@@ -18,6 +18,7 @@
package org.apache.ignite.internal;
import org.apache.ignite.*;
+import org.apache.ignite.cluster.*;
import org.apache.ignite.configuration.*;
import org.apache.ignite.internal.managers.checkpoint.*;
import org.apache.ignite.internal.managers.collision.*;
@@ -501,9 +502,18 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable
return ((IgniteKernal)grid).isStopping();
}
+ /** */
+ private ClusterNode locNode;
+
/** {@inheritDoc} */
@Override public UUID localNodeId() {
- return cfg.getNodeId();
+ if (locNode != null)
+ return locNode.id();
+
+ if (discoMgr != null)
+ locNode = discoMgr.localNode();
+
+ return locNode != null ? locNode.id() : config().getNodeId();
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f5f3efd1/modules/core/src/main/java/org/apache/ignite/internal/GridKernalGateway.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalGateway.java b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalGateway.java
index 0156136..20d81de 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalGateway.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalGateway.java
@@ -17,6 +17,7 @@
package org.apache.ignite.internal;
+import org.apache.ignite.internal.util.future.*;
import org.apache.ignite.internal.util.tostring.*;
/**
@@ -39,22 +40,6 @@ import org.apache.ignite.internal.util.tostring.*;
@GridToStringExclude
public interface GridKernalGateway {
/**
- * Performs light-weight check on the kernal state at the moment of this call.
- * <p>
- * This method should only be used when the kernal state should be checked just once
- * at the beginning of the method and the fact that <b>kernal state can change in the middle
- * of such method's execution</b> should not matter.
- * <p>
- * For example, when a method returns a constant value its implementation doesn't depend
- * on the kernal being valid throughout its execution. In such case it is enough to check
- * the kernal's state just once at the beginning of this method to provide consistent behavior
- * of the API without incurring overhead of <code>lock-based</code> guard methods.
- *
- * @throws IllegalStateException Thrown in case when no kernal calls are allowed.
- */
- public void lightCheck() throws IllegalStateException;
-
- /**
* Should be called on entering every kernal related call
* <b>originated directly or indirectly via public API</b>.
* <p>
@@ -113,31 +98,27 @@ public interface GridKernalGateway {
public void writeUnlock();
/**
- * Adds stop listener. Note that the identity set will be used to store listeners for
- * performance reasons. Futures can register a listener to be notified when they need to
- * be internally interrupted.
+ * Gets user stack trace through the first call of grid public API.
*
- * @param lsnr Listener to add.
+ * @return User stack trace.
*/
- public void addStopListener(Runnable lsnr);
+ public String userStackTrace();
/**
- * Removes previously added stop listener.
- *
- * @param lsnr Listener to remove.
+ * @param timeout Timeout.
+ * @return {@code True} if write lock has been acquired.
+ * @throws InterruptedException If interrupted.
*/
- public void removeStopListener(Runnable lsnr);
+ public boolean tryWriteLock(long timeout) throws InterruptedException;
/**
- * Gets user stack trace through the first call of grid public API.
+ * Disconnected callback.
*/
- public String userStackTrace();
+ public void onDisconnected();
/**
- * @param timeout Timeout.
- * @return {@code True} if write lock has been acquired.
- * @throws InterruptedException If interrupted.
+ * Reconnected callback.
*/
- public boolean tryWriteLock(long timeout) throws InterruptedException;
+ public void onReconnected();
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f5f3efd1/modules/core/src/main/java/org/apache/ignite/internal/GridKernalGatewayImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalGatewayImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalGatewayImpl.java
index 35bbbed..ef894cf 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalGatewayImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalGatewayImpl.java
@@ -17,12 +17,13 @@
package org.apache.ignite.internal;
+import org.apache.ignite.*;
import org.apache.ignite.internal.util.*;
+import org.apache.ignite.internal.util.future.*;
import org.apache.ignite.internal.util.tostring.*;
import org.apache.ignite.internal.util.typedef.internal.*;
import java.io.*;
-import java.util.*;
import java.util.concurrent.*;
/**
@@ -38,10 +39,6 @@ public class GridKernalGatewayImpl implements GridKernalGateway, Serializable {
private final GridSpinReadWriteLock rwLock = new GridSpinReadWriteLock();
/** */
- @GridToStringExclude
- private final Collection<Runnable> lsnrs = new GridSetWrapper<>(new IdentityHashMap<Runnable, Object>());
-
- /** */
private volatile GridKernalState state = GridKernalState.STOPPED;
/** */
@@ -63,12 +60,6 @@ public class GridKernalGatewayImpl implements GridKernalGateway, Serializable {
}
/** {@inheritDoc} */
- @Override public void lightCheck() throws IllegalStateException {
- if (state != GridKernalState.STARTED)
- throw illegalState();
- }
-
- /** {@inheritDoc} */
@SuppressWarnings({"LockAcquiredButNotSafelyReleased", "BusyWait"})
@Override public void readLock() throws IllegalStateException {
if (stackTrace == null)
@@ -76,12 +67,7 @@ public class GridKernalGatewayImpl implements GridKernalGateway, Serializable {
rwLock.readLock();
- if (state != GridKernalState.STARTED) {
- // Unlock just acquired lock.
- rwLock.readUnlock();
-
- throw illegalState();
- }
+ checkState(true);
}
/** {@inheritDoc} */
@@ -90,6 +76,8 @@ public class GridKernalGatewayImpl implements GridKernalGateway, Serializable {
stackTrace = stackTrace();
rwLock.readLock();
+
+ checkState(false);
}
/** {@inheritDoc} */
@@ -137,6 +125,36 @@ public class GridKernalGatewayImpl implements GridKernalGateway, Serializable {
return false;
}
+ /** {@inheritDoc} */
+ @Override public void onDisconnected() {
+ rwLock.readLock();
+
+ try {
+ if (state == GridKernalState.STARTED)
+ state = GridKernalState.DISCONNECTED;
+ }
+ finally {
+ rwLock.readUnlock();
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onReconnected() {
+ rwLock.writeLock();
+
+ try {
+ if (state == GridKernalState.DISCONNECTED)
+ state = GridKernalState.STARTED;
+ }
+ finally {
+ rwLock.writeUnlock();
+ }
+
+ synchronized (this) {
+ notifyAll();
+ }
+ }
+
/**
* Retrieves user stack trace.
*
@@ -173,16 +191,8 @@ public class GridKernalGatewayImpl implements GridKernalGateway, Serializable {
// NOTE: this method should always be called within write lock.
this.state = state;
- if (state == GridKernalState.STOPPING) {
- Runnable[] runs;
-
- synchronized (lsnrs) {
- lsnrs.toArray(runs = new Runnable[lsnrs.size()]);
- }
-
- // In the same thread.
- for (Runnable r : runs)
- r.run();
+ synchronized (this) {
+ notifyAll();
}
}
@@ -192,33 +202,42 @@ public class GridKernalGatewayImpl implements GridKernalGateway, Serializable {
}
/** {@inheritDoc} */
- @Override public void addStopListener(Runnable lsnr) {
- assert lsnr != null;
-
- if (state == GridKernalState.STARTING || state == GridKernalState.STARTED)
- synchronized (lsnrs) {
- lsnrs.add(lsnr);
- }
- else
- // Call right away in the same thread.
- lsnr.run();
+ @Override public String userStackTrace() {
+ return stackTrace;
}
- /** {@inheritDoc} */
- @Override public void removeStopListener(Runnable lsnr) {
- assert lsnr != null;
-
- synchronized (lsnrs) {
- lsnrs.remove(lsnr);
+ /**
+ * @param err If {@code true} throws {@link IllegalStateException} if not started.
+ */
+ private void checkState(boolean err) {
+ if (state != GridKernalState.STARTED) {
+ do {
+ if (state == GridKernalState.DISCONNECTED) {
+ rwLock.readUnlock();
+
+ try {
+ synchronized (this) {
+ while (state == GridKernalState.DISCONNECTED)
+ this.wait();
+ }
+ }
+ catch (InterruptedException e) {
+ throw new IgniteException(e);
+ }
+
+ rwLock.readLock();
+ }
+ else if (err) {
+ rwLock.readUnlock();
+
+ throw illegalState();
+ }
+ }
+ while (state != GridKernalState.STARTED);
}
}
/** {@inheritDoc} */
- @Override public String userStackTrace() {
- return stackTrace;
- }
-
- /** {@inheritDoc} */
@Override public String toString() {
return S.toString(GridKernalGatewayImpl.class, this);
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f5f3efd1/modules/core/src/main/java/org/apache/ignite/internal/GridKernalState.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalState.java b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalState.java
index fbb8f45..7d63578 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalState.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalState.java
@@ -32,6 +32,9 @@ public enum GridKernalState {
/** Kernal is stopping. */
STOPPING,
+ /** Kernal is disconnected. */
+ DISCONNECTED,
+
/** Kernal is stopped.
* <p>
* This is also the initial state of the kernal.
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f5f3efd1/modules/core/src/main/java/org/apache/ignite/internal/GridPluginComponent.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridPluginComponent.java b/modules/core/src/main/java/org/apache/ignite/internal/GridPluginComponent.java
index b438bc1..709ce3d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridPluginComponent.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridPluginComponent.java
@@ -64,6 +64,16 @@ public class GridPluginComponent implements GridComponent {
}
/** {@inheritDoc} */
+ @Override public void onDisconnected() {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onReconnected() {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
@Override public void onKernalStop(boolean cancel) {
plugin.onIgniteStop(cancel);
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f5f3efd1/modules/core/src/main/java/org/apache/ignite/internal/IgniteDisconnectedCheckedException.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteDisconnectedCheckedException.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteDisconnectedCheckedException.java
new file mode 100644
index 0000000..0684356
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteDisconnectedCheckedException.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal;
+
+import org.apache.ignite.*;
+
+/**
+ *
+ */
+public class IgniteDisconnectedCheckedException extends IgniteCheckedException {
+ /**
+ * @param msg Message.
+ */
+ public IgniteDisconnectedCheckedException(String msg) {
+ super(msg);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f5f3efd1/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
index e19d3d3..821a1f5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
@@ -60,6 +60,7 @@ import org.apache.ignite.internal.processors.session.*;
import org.apache.ignite.internal.processors.task.*;
import org.apache.ignite.internal.processors.timeout.*;
import org.apache.ignite.internal.util.*;
+import org.apache.ignite.internal.util.future.*;
import org.apache.ignite.internal.util.lang.*;
import org.apache.ignite.internal.util.tostring.*;
import org.apache.ignite.internal.util.typedef.*;
@@ -439,7 +440,8 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
assert cfg != null;
return F.transform(cfg.getUserAttributes().entrySet(), new C1<Map.Entry<String, ?>, String>() {
- @Override public String apply(Map.Entry<String, ?> e) {
+ @Override
+ public String apply(Map.Entry<String, ?> e) {
return e.getKey() + ", " + e.getValue().toString();
}
});
@@ -2800,6 +2802,105 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
}
/**
+ *
+ */
+ public void disconnected() {
+ ctx.gateway().onDisconnected();
+
+ try {
+ for (GridComponent comp : ctx.components())
+ comp.onDisconnected();
+ }
+ catch (IgniteCheckedException e) {
+ e.printStackTrace();
+ }
+ }
+
+ private void stopOnDisconnect() {
+ GridCacheProcessor cacheProcessor = ctx.cache();
+
+ List<GridComponent> comps = ctx.components();
+
+ // Callback component in reverse order while kernal is still functional
+ // if called in the same thread, at least.
+ for (ListIterator<GridComponent> it = comps.listIterator(comps.size()); it.hasPrevious();) {
+ GridComponent comp = it.previous();
+
+ try {
+ if (!skipDaemon(comp) && (!(comp instanceof GridManager)))
+ comp.onKernalStop(true);
+ }
+ catch (Throwable e) {
+ errOnStop = true;
+
+ U.error(log, "Failed to pre-stop processor: " + comp, e);
+
+ if (e instanceof Error)
+ throw e;
+ }
+ }
+
+ if (cacheProcessor != null)
+ cacheProcessor.cancelUserOperations();
+
+ for (ListIterator<GridComponent> it = comps.listIterator(comps.size()); it.hasPrevious();) {
+ GridComponent comp = it.previous();
+
+ try {
+ if (!skipDaemon(comp) && (!(comp instanceof GridManager))) {
+ comp.stop(true);
+
+ if (log.isDebugEnabled())
+ log.debug("Component stopped: " + comp);
+ }
+ }
+ catch (Throwable e) {
+ errOnStop = true;
+
+ U.error(log, "Failed to stop component (ignoring): " + comp, e);
+
+ if (e instanceof Error)
+ throw (Error)e;
+ }
+ }
+
+ ctx.marshallerContext().onDisconnected();
+ }
+
+ private void restart() throws IgniteCheckedException {
+ List<PluginProvider> plugins = U.allPluginProviders();
+
+ startProcessor(new ClusterProcessor(ctx));
+
+ GridResourceProcessor rsrcProc = new GridResourceProcessor(ctx);
+
+ rsrcProc.setSpringContext(rsrcCtx);
+
+ scheduler = new IgniteSchedulerImpl(ctx);
+
+ startProcessor(rsrcProc);
+ }
+
+ /**
+ *
+ */
+ public void reconnected() {
+ new Thread() {
+ public void run() {
+ try {
+ ctx.gateway().onReconnected();
+
+ for (GridComponent comp : ctx.components())
+ comp.onReconnected();
+ }
+ catch (IgniteCheckedException e) {
+ e.printStackTrace();
+ }
+ }
+ }.start();
+ }
+
+ /**
* Creates optional component.
*
* @param cls Component interface.
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f5f3efd1/modules/core/src/main/java/org/apache/ignite/internal/MarshallerContextImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/MarshallerContextImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/MarshallerContextImpl.java
index 9f7c983..948babc 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/MarshallerContextImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/MarshallerContextImpl.java
@@ -32,7 +32,7 @@ import java.util.concurrent.*;
*/
public class MarshallerContextImpl extends MarshallerContextAdapter {
/** */
- private final CountDownLatch latch = new CountDownLatch(1);
+ private CountDownLatch latch = new CountDownLatch(1);
/** */
private final File workDir;
@@ -57,6 +57,15 @@ public class MarshallerContextImpl extends MarshallerContextAdapter {
}
/**
+ *
+ */
+ public void onDisconnected() {
+ latch = new CountDownLatch(1);
+
+ cache = null;
+ }
+
+ /**
* @param ctx Kernal context.
* @throws IgniteCheckedException In case of error.
*/
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f5f3efd1/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java
index 40a5ea5..d886f4c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java
@@ -166,6 +166,16 @@ public abstract class GridManagerAdapter<T extends IgniteSpi> implements GridMan
// No-op.
}
+ /** {@inheritDoc} */
+ @Override public void onDisconnected() throws IgniteCheckedException {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onReconnected() throws IgniteCheckedException {
+ // No-op.
+ }
+
/**
* Starts wrapped SPI.
*
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f5f3efd1/modules/core/src/main/java/org/apache/ignite/internal/managers/deployment/GridDeploymentManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/deployment/GridDeploymentManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/deployment/GridDeploymentManager.java
index 75fe98f..b82090f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/deployment/GridDeploymentManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/deployment/GridDeploymentManager.java
@@ -110,20 +110,24 @@ public class GridDeploymentManager extends GridManagerAdapter<DeploymentSpi> {
}
/** {@inheritDoc} */
- @Override public void stop(boolean cancel) throws IgniteCheckedException {
- GridProtocolHandler.deregisterDeploymentManager();
+ @Override public void onDisconnected() throws IgniteCheckedException {
+ storesOnKernalStop();
- if (verStore != null)
- verStore.stop();
+ storesStop();
- if (ldrStore != null)
- ldrStore.stop();
+ startStores();
+ }
- if (locStore != null)
- locStore.stop();
+ /** {@inheritDoc} */
+ @Override public void onReconnected() throws IgniteCheckedException {
+ storesOnKernalStart();
+ }
- if (comm != null)
- comm.stop();
+ /** {@inheritDoc} */
+ @Override public void stop(boolean cancel) throws IgniteCheckedException {
+ GridProtocolHandler.deregisterDeploymentManager();
+
+ storesStop();
getSpi().setListener(null);
@@ -135,21 +139,12 @@ public class GridDeploymentManager extends GridManagerAdapter<DeploymentSpi> {
/** {@inheritDoc} */
@Override public void onKernalStart0() throws IgniteCheckedException {
- locStore.onKernalStart();
- ldrStore.onKernalStart();
- verStore.onKernalStart();
+ storesOnKernalStart();
}
/** {@inheritDoc} */
@Override public void onKernalStop0(boolean cancel) {
- if (verStore != null)
- verStore.onKernalStop();
-
- if (ldrStore != null)
- ldrStore.onKernalStop();
-
- if (locStore != null)
- locStore.onKernalStop();
+ storesOnKernalStop();
}
/** {@inheritDoc} */
@@ -547,6 +542,57 @@ public class GridDeploymentManager extends GridManagerAdapter<DeploymentSpi> {
return ldr instanceof GridDeploymentClassLoader;
}
+
+ /**
+ * @throws IgniteCheckedException If failed.
+ */
+ private void startStores() throws IgniteCheckedException {
+ locStore = new GridDeploymentLocalStore(getSpi(), ctx, comm);
+ ldrStore = new GridDeploymentPerLoaderStore(getSpi(), ctx, comm);
+ verStore = new GridDeploymentPerVersionStore(getSpi(), ctx, comm);
+
+ locStore.start();
+ ldrStore.start();
+ verStore.start();
+ }
+
+ /**
+ * @throws IgniteCheckedException If failed.
+ */
+ private void storesOnKernalStart() throws IgniteCheckedException {
+ locStore.onKernalStart();
+ ldrStore.onKernalStart();
+ verStore.onKernalStart();
+ }
+
+ /**
+ *
+ */
+ private void storesOnKernalStop() {
+ if (verStore != null)
+ verStore.onKernalStop();
+
+ if (ldrStore != null)
+ ldrStore.onKernalStop();
+
+ if (locStore != null)
+ locStore.onKernalStop();
+ }
+
+ /**
+ *
+ */
+ private void storesStop() {
+ if (verStore != null)
+ verStore.stop();
+
+ if (ldrStore != null)
+ ldrStore.stop();
+
+ if (locStore != null)
+ locStore.stop();
+ }
+
/**
*
*/
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f5f3efd1/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
index 1e4b972..7a524a4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
@@ -287,6 +287,11 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
}
/** {@inheritDoc} */
+ @Override public void onDisconnected() throws IgniteCheckedException {
+ locJoinEvt = new GridFutureAdapter<>();
+ }
+
+ /** {@inheritDoc} */
@Override protected void onKernalStart0() throws IgniteCheckedException {
if (Boolean.TRUE.equals(ctx.config().isClientMode()) && !getSpi().isClientMode())
ctx.performance().add("Enable client mode for TcpDiscoverySpi " +
@@ -386,7 +391,9 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
verChanged = false;
}
else {
- if (type != EVT_NODE_SEGMENTED) {
+ if (type != EVT_NODE_SEGMENTED &&
+ type != EVT_CLIENT_NODE_DISCONNECTED &&
+ type != EVT_CLIENT_NODE_RECONNECTED) {
minorTopVer = 0;
verChanged = true;
@@ -1693,6 +1700,12 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
else if (type == EVT_NODE_SEGMENTED)
evt.message("Node segmented: " + node);
+ else if (type == EVT_CLIENT_NODE_DISCONNECTED)
+ evt.message("Client node disconnected: " + node);
+
+ else if (type == EVT_CLIENT_NODE_RECONNECTED)
+ evt.message("Client node reconnected: " + node);
+
else
assert false;
@@ -1862,6 +1875,22 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
break;
}
+ case EVT_CLIENT_NODE_DISCONNECTED: {
+ assert localNode().isClient() : evt;
+
+ ((IgniteKernal)ctx.grid()).disconnected();
+
+ break;
+ }
+
+ case EVT_CLIENT_NODE_RECONNECTED: {
+ assert localNode().isClient() : evt;
+
+ ((IgniteKernal)ctx.grid()).reconnected();
+
+ break;
+ }
+
case DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT: {
if (ctx.event().isRecordable(DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT)) {
DiscoveryCustomEvent customEvt = new DiscoveryCustomEvent();
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f5f3efd1/modules/core/src/main/java/org/apache/ignite/internal/processors/GridProcessorAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/GridProcessorAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/GridProcessorAdapter.java
index a84c48a..04a39d1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/GridProcessorAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/GridProcessorAdapter.java
@@ -62,6 +62,16 @@ public abstract class GridProcessorAdapter implements GridProcessor {
}
/** {@inheritDoc} */
+ @Override public void onDisconnected() throws IgniteCheckedException {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onReconnected() throws IgniteCheckedException {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
@Nullable @Override public DiscoveryDataExchangeType discoveryDataType() {
return null;
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f5f3efd1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
index 7335d72..d754e3e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
@@ -906,12 +906,12 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
/** {@inheritDoc} */
@Override public Set<K> keySet() {
- return keySet((CacheEntryPredicate[])null);
+ return keySet((CacheEntryPredicate[]) null);
}
/** {@inheritDoc} */
@Override public Set<K> keySetx() {
- return keySetx((CacheEntryPredicate[])null);
+ return keySetx((CacheEntryPredicate[]) null);
}
/** {@inheritDoc} */
@@ -1217,7 +1217,8 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
return getAllAsync(Collections.singletonList(key), /*force primary*/true, /*skip tx*/false, null, null,
taskName, true, false).chain(new CX1<IgniteInternalFuture<Map<K, V>>, V>() {
- @Override public V applyx(IgniteInternalFuture<Map<K, V>> e) throws IgniteCheckedException {
+ @Override
+ public V applyx(IgniteInternalFuture<Map<K, V>> e) throws IgniteCheckedException {
return e.get().get(key);
}
});
@@ -1259,11 +1260,12 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
String taskName,
final IgniteBiInClosure<KeyCacheObject, Object> vis) {
return ctx.closures().callLocalSafe(new GPC<Object>() {
- @Nullable @Override public Object call() {
+ @Nullable
+ @Override
+ public Object call() {
try {
ctx.store().loadAll(tx, keys, vis);
- }
- catch (IgniteCheckedException e) {
+ } catch (IgniteCheckedException e) {
throw new GridClosureException(e);
}
@@ -1465,8 +1467,9 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
if (ctx.config().getInterceptor() != null)
fut = fut.chain(new CX1<IgniteInternalFuture<V>, V>() {
- @Override public V applyx(IgniteInternalFuture<V> f) throws IgniteCheckedException {
- return (V)ctx.config().getInterceptor().onGet(key, f.get());
+ @Override
+ public V applyx(IgniteInternalFuture<V> f) throws IgniteCheckedException {
+ return (V) ctx.config().getInterceptor().onGet(key, f.get());
}
});
@@ -1978,12 +1981,14 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
validateCacheValue(val);
return asyncOp(new AsyncOp<V>() {
- @Override public IgniteInternalFuture<V> op(IgniteTxLocalAdapter tx) {
+ @Override
+ public IgniteInternalFuture<V> op(IgniteTxLocalAdapter tx) {
return tx.putAllAsync(ctx, F.t(key, val), true, null, -1, filter)
- .chain((IgniteClosure<IgniteInternalFuture<GridCacheReturn>, V>)RET2VAL);
+ .chain((IgniteClosure<IgniteInternalFuture<GridCacheReturn>, V>) RET2VAL);
}
- @Override public String toString() {
+ @Override
+ public String toString() {
return "putAsync [key=" + key + ", val=" + val + ", filter=" + Arrays.toString(filter) + ']';
}
});
@@ -2041,11 +2046,13 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
ctx.dr().onReceiveCacheEntriesReceived(drMap.size());
syncOp(new SyncInOp(drMap.size() == 1) {
- @Override public void inOp(IgniteTxLocalAdapter tx) throws IgniteCheckedException {
+ @Override
+ public void inOp(IgniteTxLocalAdapter tx) throws IgniteCheckedException {
tx.putAllDrAsync(ctx, drMap).get();
}
- @Override public String toString() {
+ @Override
+ public String toString() {
return "putAllConflict [drMap=" + drMap + ']';
}
});
@@ -2060,11 +2067,13 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
ctx.dr().onReceiveCacheEntriesReceived(drMap.size());
return asyncOp(new AsyncInOp(drMap.keySet()) {
- @Override public IgniteInternalFuture<?> inOp(IgniteTxLocalAdapter tx) {
+ @Override
+ public IgniteInternalFuture<?> inOp(IgniteTxLocalAdapter tx) {
return tx.putAllDrAsync(ctx, drMap);
}
- @Override public String toString() {
+ @Override
+ public String toString() {
return "putAllConflictAsync [drMap=" + drMap + ']';
}
});
@@ -2081,7 +2090,9 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
validateCacheKey(key);
return syncOp(new SyncOp<EntryProcessorResult<T>>(true) {
- @Nullable @Override public EntryProcessorResult<T> op(IgniteTxLocalAdapter tx)
+ @Nullable
+ @Override
+ public EntryProcessorResult<T> op(IgniteTxLocalAdapter tx)
throws IgniteCheckedException {
Map<? extends K, EntryProcessor<K, V, Object>> invokeMap =
Collections.singletonMap(key, (EntryProcessor<K, V, Object>) entryProcessor);
@@ -2113,11 +2124,14 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
validateCacheKeys(keys);
return syncOp(new SyncOp<Map<K, EntryProcessorResult<T>>>(keys.size() == 1) {
- @Nullable @Override public Map<K, EntryProcessorResult<T>> op(IgniteTxLocalAdapter tx)
+ @Nullable
+ @Override
+ public Map<K, EntryProcessorResult<T>> op(IgniteTxLocalAdapter tx)
throws IgniteCheckedException {
Map<? extends K, EntryProcessor<K, V, Object>> invokeMap = F.viewAsMap(keys,
new C1<K, EntryProcessor<K, V, Object>>() {
- @Override public EntryProcessor apply(K k) {
+ @Override
+ public EntryProcessor apply(K k) {
return entryProcessor;
}
});
@@ -2158,7 +2172,8 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
IgniteInternalFuture<GridCacheReturn> fut0 = (IgniteInternalFuture<GridCacheReturn>)fut;
return fut0.chain(new CX1<IgniteInternalFuture<GridCacheReturn>, EntryProcessorResult<T>>() {
- @Override public EntryProcessorResult<T> applyx(IgniteInternalFuture<GridCacheReturn> fut)
+ @Override
+ public EntryProcessorResult<T> applyx(IgniteInternalFuture<GridCacheReturn> fut)
throws IgniteCheckedException {
GridCacheReturn ret = fut.get();
@@ -2188,7 +2203,8 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
IgniteInternalFuture<?> fut = asyncOp(new AsyncInOp(keys) {
@Override public IgniteInternalFuture<GridCacheReturn> inOp(IgniteTxLocalAdapter tx) {
Map<? extends K, EntryProcessor<K, V, Object>> invokeMap = F.viewAsMap(keys, new C1<K, EntryProcessor<K, V, Object>>() {
- @Override public EntryProcessor apply(K k) {
+ @Override
+ public EntryProcessor apply(K k) {
return entryProcessor;
}
});
@@ -2205,7 +2221,8 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
(IgniteInternalFuture<GridCacheReturn>)fut;
return fut0.chain(new CX1<IgniteInternalFuture<GridCacheReturn>, Map<K, EntryProcessorResult<T>>>() {
- @Override public Map<K, EntryProcessorResult<T>> applyx(IgniteInternalFuture<GridCacheReturn> fut)
+ @Override
+ public Map<K, EntryProcessorResult<T>> applyx(IgniteInternalFuture<GridCacheReturn> fut)
throws IgniteCheckedException {
GridCacheReturn ret = fut.get();
@@ -2238,7 +2255,8 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
IgniteInternalFuture<GridCacheReturn> fut0 = (IgniteInternalFuture<GridCacheReturn>)fut;
return fut0.chain(new CX1<IgniteInternalFuture<GridCacheReturn>, Map<K, EntryProcessorResult<T>>>() {
- @Override public Map<K, EntryProcessorResult<T>> applyx(IgniteInternalFuture<GridCacheReturn> fut)
+ @Override
+ public Map<K, EntryProcessorResult<T>> applyx(IgniteInternalFuture<GridCacheReturn> fut)
throws IgniteCheckedException {
GridCacheReturn ret = fut.get();
@@ -2259,10 +2277,12 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
validateCacheKeys(map.keySet());
return syncOp(new SyncOp<Map<K, EntryProcessorResult<T>>>(map.size() == 1) {
- @Nullable @Override public Map<K, EntryProcessorResult<T>> op(IgniteTxLocalAdapter tx)
+ @Nullable
+ @Override
+ public Map<K, EntryProcessorResult<T>> op(IgniteTxLocalAdapter tx)
throws IgniteCheckedException {
IgniteInternalFuture<GridCacheReturn> fut =
- tx.invokeAsync(ctx, (Map<? extends K, ? extends EntryProcessor<K, V, Object>>)map, args);
+ tx.invokeAsync(ctx, (Map<? extends K, ? extends EntryProcessor<K, V, Object>>) map, args);
return fut.get().value();
}
@@ -2310,12 +2330,14 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
validateCacheValue(val);
return asyncOp(new AsyncOp<Boolean>() {
- @Override public IgniteInternalFuture<Boolean> op(IgniteTxLocalAdapter tx) {
+ @Override
+ public IgniteInternalFuture<Boolean> op(IgniteTxLocalAdapter tx) {
return tx.putAllAsync(ctx, F.t(key, val), false, null, -1, filter).chain(
(IgniteClosure<IgniteInternalFuture<GridCacheReturn>, Boolean>) RET2FLAG);
}
- @Override public String toString() {
+ @Override
+ public String toString() {
return "putxAsync [key=" + key + ", val=" + val + ", filter=" + Arrays.toString(filter) + ']';
}
});
@@ -2345,11 +2367,13 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
validateCacheValue(val);
return syncOp(new SyncOp<V>(true) {
- @Override public V op(IgniteTxLocalAdapter tx) throws IgniteCheckedException {
- return (V)tx.putAllAsync(ctx, F.t(key, val), true, null, -1, ctx.noValArray()).get().value();
+ @Override
+ public V op(IgniteTxLocalAdapter tx) throws IgniteCheckedException {
+ return (V) tx.putAllAsync(ctx, F.t(key, val), true, null, -1, ctx.noValArray()).get().value();
}
- @Override public String toString() {
+ @Override
+ public String toString() {
return "putIfAbsent [key=" + key + ", val=" + val + ']';
}
});
@@ -2369,12 +2393,14 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
validateCacheValue(val);
IgniteInternalFuture<V> fut = asyncOp(new AsyncOp<V>() {
- @Override public IgniteInternalFuture<V> op(IgniteTxLocalAdapter tx) {
+ @Override
+ public IgniteInternalFuture<V> op(IgniteTxLocalAdapter tx) {
return tx.putAllAsync(ctx, F.t(key, val), true, null, -1, ctx.noValArray())
.chain((IgniteClosure<IgniteInternalFuture<GridCacheReturn>, V>) RET2VAL);
}
- @Override public String toString() {
+ @Override
+ public String toString() {
return "putIfAbsentAsync [key=" + key + ", val=" + val + ']';
}
});
@@ -2399,11 +2425,13 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
validateCacheValue(val);
Boolean stored = syncOp(new SyncOp<Boolean>(true) {
- @Override public Boolean op(IgniteTxLocalAdapter tx) throws IgniteCheckedException {
+ @Override
+ public Boolean op(IgniteTxLocalAdapter tx) throws IgniteCheckedException {
return tx.putAllAsync(ctx, F.t(key, val), false, null, -1, ctx.noValArray()).get().success();
}
- @Override public String toString() {
+ @Override
+ public String toString() {
return "putxIfAbsent [key=" + key + ", val=" + val + ']';
}
});
@@ -2428,12 +2456,14 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
validateCacheValue(val);
IgniteInternalFuture<Boolean> fut = asyncOp(new AsyncOp<Boolean>() {
- @Override public IgniteInternalFuture<Boolean> op(IgniteTxLocalAdapter tx) {
+ @Override
+ public IgniteInternalFuture<Boolean> op(IgniteTxLocalAdapter tx) {
return tx.putAllAsync(ctx, F.t(key, val), false, null, -1, ctx.noValArray()).chain(
- (IgniteClosure<IgniteInternalFuture<GridCacheReturn>, Boolean>)RET2FLAG);
+ (IgniteClosure<IgniteInternalFuture<GridCacheReturn>, Boolean>) RET2FLAG);
}
- @Override public String toString() {
+ @Override
+ public String toString() {
return "putxIfAbsentAsync [key=" + key + ", val=" + val + ']';
}
});
@@ -2504,11 +2534,13 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
validateCacheValue(val);
return syncOp(new SyncOp<Boolean>(true) {
- @Override public Boolean op(IgniteTxLocalAdapter tx) throws IgniteCheckedException {
+ @Override
+ public Boolean op(IgniteTxLocalAdapter tx) throws IgniteCheckedException {
return tx.putAllAsync(ctx, F.t(key, val), false, null, -1, ctx.hasValArray()).get().success();
}
- @Override public String toString() {
+ @Override
+ public String toString() {
return "replacex [key=" + key + ", val=" + val + ']';
}
});
@@ -2524,12 +2556,14 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
validateCacheValue(val);
return asyncOp(new AsyncOp<Boolean>() {
- @Override public IgniteInternalFuture<Boolean> op(IgniteTxLocalAdapter tx) {
+ @Override
+ public IgniteInternalFuture<Boolean> op(IgniteTxLocalAdapter tx) {
return tx.putAllAsync(ctx, F.t(key, val), false, null, -1, ctx.hasValArray()).chain(
(IgniteClosure<IgniteInternalFuture<GridCacheReturn>, Boolean>) RET2FLAG);
}
- @Override public String toString() {
+ @Override
+ public String toString() {
return "replacexAsync [key=" + key + ", val=" + val + ']';
}
});
@@ -2547,7 +2581,8 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
validateCacheValue(newVal);
return syncOp(new SyncOp<Boolean>(true) {
- @Override public Boolean op(IgniteTxLocalAdapter tx) throws IgniteCheckedException {
+ @Override
+ public Boolean op(IgniteTxLocalAdapter tx) throws IgniteCheckedException {
// Register before hiding in the filter.
if (ctx.deploymentEnabled())
ctx.deploy().registerClass(oldVal);
@@ -2556,7 +2591,8 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
.success();
}
- @Override public String toString() {
+ @Override
+ public String toString() {
return "replace [key=" + key + ", oldVal=" + oldVal + ", newVal=" + newVal + ']';
}
});
@@ -2619,11 +2655,13 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
validateCacheValues(m.values());
syncOp(new SyncInOp(m.size() == 1) {
- @Override public void inOp(IgniteTxLocalAdapter tx) throws IgniteCheckedException {
+ @Override
+ public void inOp(IgniteTxLocalAdapter tx) throws IgniteCheckedException {
tx.putAllAsync(ctx, m, false, null, -1, CU.empty0()).get();
}
- @Override public String toString() {
+ @Override
+ public String toString() {
return "putAll [map=" + m + ']';
}
});
@@ -2665,16 +2703,18 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
validateCacheKey(key);
V prevVal = syncOp(new SyncOp<V>(true) {
- @Override public V op(IgniteTxLocalAdapter tx) throws IgniteCheckedException {
+ @Override
+ public V op(IgniteTxLocalAdapter tx) throws IgniteCheckedException {
V ret = tx.removeAllAsync(ctx, Collections.singletonList(key), null, true, CU.empty0()).get().value();
if (ctx.config().getInterceptor() != null)
- return (V)ctx.config().getInterceptor().onBeforeRemove(new CacheEntryImpl(key, ret)).get2();
+ return (V) ctx.config().getInterceptor().onBeforeRemove(new CacheEntryImpl(key, ret)).get2();
return ret;
}
- @Override public String toString() {
+ @Override
+ public String toString() {
return "remove [key=" + key + ']';
}
});
@@ -2697,13 +2737,15 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
validateCacheKey(key);
IgniteInternalFuture<V> fut = asyncOp(new AsyncOp<V>() {
- @Override public IgniteInternalFuture<V> op(IgniteTxLocalAdapter tx) {
+ @Override
+ public IgniteInternalFuture<V> op(IgniteTxLocalAdapter tx) {
// TODO should we invoke interceptor here?
return tx.removeAllAsync(ctx, Collections.singletonList(key), null, true, CU.empty0())
.chain((IgniteClosure<IgniteInternalFuture<GridCacheReturn>, V>) RET2VAL);
}
- @Override public String toString() {
+ @Override
+ public String toString() {
return "removeAsync [key=" + key + ']';
}
});
@@ -2745,11 +2787,13 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
validateCacheKeys(keys);
syncOp(new SyncInOp(keys.size() == 1) {
- @Override public void inOp(IgniteTxLocalAdapter tx) throws IgniteCheckedException {
+ @Override
+ public void inOp(IgniteTxLocalAdapter tx) throws IgniteCheckedException {
tx.removeAllAsync(ctx, keys, null, false, CU.empty0()).get();
}
- @Override public String toString() {
+ @Override
+ public String toString() {
return "removeAll [keys=" + keys + ']';
}
});
@@ -2771,11 +2815,13 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
validateCacheKeys(keys);
IgniteInternalFuture<Object> fut = asyncOp(new AsyncInOp(keys) {
- @Override public IgniteInternalFuture<?> inOp(IgniteTxLocalAdapter tx) {
+ @Override
+ public IgniteInternalFuture<?> inOp(IgniteTxLocalAdapter tx) {
return tx.removeAllAsync(ctx, keys, null, false, CU.empty0()).chain(RET2NULL);
}
- @Override public String toString() {
+ @Override
+ public String toString() {
return "removeAllAsync [keys=" + keys + ']';
}
});
@@ -2798,11 +2844,13 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
validateCacheKey(key);
boolean rmv = syncOp(new SyncOp<Boolean>(true) {
- @Override public Boolean op(IgniteTxLocalAdapter tx) throws IgniteCheckedException {
+ @Override
+ public Boolean op(IgniteTxLocalAdapter tx) throws IgniteCheckedException {
return tx.removeAllAsync(ctx, Collections.singletonList(key), null, false, CU.empty0()).get().success();
}
- @Override public String toString() {
+ @Override
+ public String toString() {
return "removex [key=" + key + ']';
}
});
@@ -2836,12 +2884,14 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
validateCacheKey(key);
IgniteInternalFuture<Boolean> fut = asyncOp(new AsyncOp<Boolean>() {
- @Override public IgniteInternalFuture<Boolean> op(IgniteTxLocalAdapter tx) {
+ @Override
+ public IgniteInternalFuture<Boolean> op(IgniteTxLocalAdapter tx) {
return tx.removeAllAsync(ctx, Collections.singletonList(key), null, false, filter).chain(
- (IgniteClosure<IgniteInternalFuture<GridCacheReturn>, Boolean>)RET2FLAG);
+ (IgniteClosure<IgniteInternalFuture<GridCacheReturn>, Boolean>) RET2FLAG);
}
- @Override public String toString() {
+ @Override
+ public String toString() {
return "removeAsync [key=" + key + ", filter=" + Arrays.toString(filter) + ']';
}
});
@@ -2860,19 +2910,21 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
validateCacheKey(key);
return syncOp(new SyncOp<GridCacheReturn>(true) {
- @Override public GridCacheReturn op(IgniteTxLocalAdapter tx) throws IgniteCheckedException {
+ @Override
+ public GridCacheReturn op(IgniteTxLocalAdapter tx) throws IgniteCheckedException {
// Register before hiding in the filter.
if (ctx.deploymentEnabled())
ctx.deploy().registerClass(val);
- return (GridCacheReturn)tx.removeAllAsync(ctx,
+ return (GridCacheReturn) tx.removeAllAsync(ctx,
Collections.singletonList(key),
null,
true,
ctx.equalsValArray(val)).get();
}
- @Override public String toString() {
+ @Override
+ public String toString() {
return "remove [key=" + key + ", val=" + val + ']';
}
});
@@ -2887,11 +2939,13 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
ctx.dr().onReceiveCacheEntriesReceived(drMap.size());
syncOp(new SyncInOp(false) {
- @Override public void inOp(IgniteTxLocalAdapter tx) throws IgniteCheckedException {
+ @Override
+ public void inOp(IgniteTxLocalAdapter tx) throws IgniteCheckedException {
tx.removeAllDrAsync(ctx, drMap).get();
}
- @Override public String toString() {
+ @Override
+ public String toString() {
return "removeAllConflict [drMap=" + drMap + ']';
}
});
@@ -2906,11 +2960,13 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
ctx.dr().onReceiveCacheEntriesReceived(drMap.size());
return asyncOp(new AsyncInOp(drMap.keySet()) {
- @Override public IgniteInternalFuture<?> inOp(IgniteTxLocalAdapter tx) {
+ @Override
+ public IgniteInternalFuture<?> inOp(IgniteTxLocalAdapter tx) {
return tx.removeAllDrAsync(ctx, drMap);
}
- @Override public String toString() {
+ @Override
+ public String toString() {
return "removeAllDrASync [drMap=" + drMap + ']';
}
});
@@ -2926,20 +2982,22 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
validateCacheKey(key);
return syncOp(new SyncOp<GridCacheReturn>(true) {
- @Override public GridCacheReturn op(IgniteTxLocalAdapter tx) throws IgniteCheckedException {
+ @Override
+ public GridCacheReturn op(IgniteTxLocalAdapter tx) throws IgniteCheckedException {
// Register before hiding in the filter.
if (ctx.deploymentEnabled())
ctx.deploy().registerClass(oldVal);
return (GridCacheReturn) tx.putAllAsync(ctx,
- F.t(key, newVal),
- true,
- null,
- -1,
- ctx.equalsValArray(oldVal)).get();
+ F.t(key, newVal),
+ true,
+ null,
+ -1,
+ ctx.equalsValArray(oldVal)).get();
}
- @Override public String toString() {
+ @Override
+ public String toString() {
return "replace [key=" + key + ", oldVal=" + oldVal + ", newVal=" + newVal + ']';
}
});
@@ -2953,17 +3011,17 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
validateCacheKey(key);
return asyncOp(new AsyncOp<GridCacheReturn>() {
- @Override public IgniteInternalFuture<GridCacheReturn> op(IgniteTxLocalAdapter tx) {
+ @Override
+ public IgniteInternalFuture<GridCacheReturn> op(IgniteTxLocalAdapter tx) {
// Register before hiding in the filter.
try {
if (ctx.deploymentEnabled())
ctx.deploy().registerClass(val);
- }
- catch (IgniteCheckedException e) {
+ } catch (IgniteCheckedException e) {
return new GridFinishedFuture<>(e);
}
- IgniteInternalFuture<GridCacheReturn> fut = (IgniteInternalFuture)tx.removeAllAsync(ctx,
+ IgniteInternalFuture<GridCacheReturn> fut = (IgniteInternalFuture) tx.removeAllAsync(ctx,
Collections.singletonList(key),
null,
true,
@@ -2972,7 +3030,8 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
return fut;
}
- @Override public String toString() {
+ @Override
+ public String toString() {
return "removeAsync [key=" + key + ", val=" + val + ']';
}
});
@@ -2987,17 +3046,17 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
validateCacheKey(key);
return asyncOp(new AsyncOp<GridCacheReturn>() {
- @Override public IgniteInternalFuture<GridCacheReturn> op(IgniteTxLocalAdapter tx) {
+ @Override
+ public IgniteInternalFuture<GridCacheReturn> op(IgniteTxLocalAdapter tx) {
// Register before hiding in the filter.
try {
if (ctx.deploymentEnabled())
ctx.deploy().registerClass(oldVal);
- }
- catch (IgniteCheckedException e) {
+ } catch (IgniteCheckedException e) {
return new GridFinishedFuture<>(e);
}
- IgniteInternalFuture<GridCacheReturn> fut = (IgniteInternalFuture)tx.putAllAsync(ctx,
+ IgniteInternalFuture<GridCacheReturn> fut = (IgniteInternalFuture) tx.putAllAsync(ctx,
F.t(key, newVal),
true,
null,
@@ -3007,7 +3066,8 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
return fut;
}
- @Override public String toString() {
+ @Override
+ public String toString() {
return "replaceAsync [key=" + key + ", oldVal=" + oldVal + ", newVal=" + newVal + ']';
}
});
@@ -3027,16 +3087,18 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
validateCacheValue(val);
boolean rmv = syncOp(new SyncOp<Boolean>(true) {
- @Override public Boolean op(IgniteTxLocalAdapter tx) throws IgniteCheckedException {
+ @Override
+ public Boolean op(IgniteTxLocalAdapter tx) throws IgniteCheckedException {
// Register before hiding in the filter.
if (ctx.deploymentEnabled())
ctx.deploy().registerClass(val);
return tx.removeAllAsync(ctx, Collections.singletonList(key), null, false,
- ctx.equalsValArray(val)).get().success();
+ ctx.equalsValArray(val)).get().success();
}
- @Override public String toString() {
+ @Override
+ public String toString() {
return "remove [key=" + key + ", val=" + val + ']';
}
});
@@ -3061,23 +3123,24 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
validateCacheValue(val);
IgniteInternalFuture<Boolean> fut = asyncOp(new AsyncOp<Boolean>() {
- @Override public IgniteInternalFuture<Boolean> op(IgniteTxLocalAdapter tx) {
+ @Override
+ public IgniteInternalFuture<Boolean> op(IgniteTxLocalAdapter tx) {
// Register before hiding in the filter.
if (ctx.deploymentEnabled()) {
try {
ctx.deploy().registerClass(val);
- }
- catch (IgniteCheckedException e) {
+ } catch (IgniteCheckedException e) {
return new GridFinishedFuture<>(e);
}
}
return tx.removeAllAsync(ctx, Collections.singletonList(key), null, false,
ctx.equalsValArray(val)).chain(
- (IgniteClosure<IgniteInternalFuture<GridCacheReturn>, Boolean>)RET2FLAG);
+ (IgniteClosure<IgniteInternalFuture<GridCacheReturn>, Boolean>) RET2FLAG);
}
- @Override public String toString() {
+ @Override
+ public String toString() {
return "removeAsync [key=" + key + ", val=" + val + ']';
}
});
@@ -3247,10 +3310,10 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
TransactionConfiguration cfg = ctx.gridConfig().getTransactionConfiguration();
return txStart(
- concurrency,
- isolation,
- cfg.getDefaultTxTimeout(),
- 0
+ concurrency,
+ isolation,
+ cfg.getDefaultTxTimeout(),
+ 0
);
}
@@ -3686,19 +3749,18 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
return F.iterator(iterator(),
new IgniteClosure<Cache.Entry<K, V>, Cache.Entry<K, V>>() {
private IgniteCacheExpiryPolicy expiryPlc =
- ctx.cache().expiryPolicy(opCtx != null ? opCtx.expiry() : null);
+ ctx.cache().expiryPolicy(opCtx != null ? opCtx.expiry() : null);
- @Override public Cache.Entry<K, V> apply(Cache.Entry<K, V> lazyEntry) {
+ @Override
+ public Cache.Entry<K, V> apply(Cache.Entry<K, V> lazyEntry) {
CacheOperationContext prev = ctx.gate().enter(opCtx);
try {
V val = localPeek(lazyEntry.getKey(), CachePeekModes.ONHEAP_ONLY, expiryPlc);
return new CacheEntryImpl<>(lazyEntry.getKey(), val);
- }
- catch (IgniteCheckedException e) {
+ } catch (IgniteCheckedException e) {
throw CU.convertToCacheException(e);
- }
- finally {
+ } finally {
ctx.gate().leave(prev);
}
}
@@ -3722,20 +3784,20 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
.execute();
return ctx.itHolder().iterator(fut, new CacheIteratorConverter<Cache.Entry<K, V>, Map.Entry<K, V>>() {
- @Override protected Cache.Entry<K, V> convert(Map.Entry<K, V> e) {
+ @Override
+ protected Cache.Entry<K, V> convert(Map.Entry<K, V> e) {
return new CacheEntryImpl<>(e.getKey(), e.getValue());
}
- @Override protected void remove(Cache.Entry<K, V> item) {
+ @Override
+ protected void remove(Cache.Entry<K, V> item) {
CacheOperationContext prev = ctx.gate().enter(opCtx);
try {
GridCacheAdapter.this.remove(item.getKey());
- }
- catch (IgniteCheckedException e) {
+ } catch (IgniteCheckedException e) {
throw CU.convertToCacheException(e);
- }
- finally {
+ } finally {
ctx.gate().leave(prev);
}
}
@@ -4380,7 +4442,8 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
return getAllAsync(Collections.singletonList(key), deserializePortable).chain(
new CX1<IgniteInternalFuture<Map<K, V>>, V>() {
- @Override public V applyx(IgniteInternalFuture<Map<K, V>> e) throws IgniteCheckedException {
+ @Override
+ public V applyx(IgniteInternalFuture<Map<K, V>> e) throws IgniteCheckedException {
Map<K, V> map = e.get();
assert map.isEmpty() || map.size() == 1 : map.size();
@@ -4428,6 +4491,13 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
public abstract void onDeferredDelete(GridCacheEntryEx entry, GridCacheVersion ver);
/**
+ *
+ */
+ public void disconnected() {
+ // No-op.
+ }
+
+ /**
* Validates that given cache value implements {@link Externalizable}.
*
* @param val Cache value.
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f5f3efd1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheDeploymentManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheDeploymentManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheDeploymentManager.java
index ff109ed..475a6e9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheDeploymentManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheDeploymentManager.java
@@ -116,6 +116,11 @@ public class GridCacheDeploymentManager<K, V> extends GridCacheSharedManagerAdap
}
/** {@inheritDoc} */
+ @Override public boolean restartOnDisconnect() {
+ return true;
+ }
+
+ /** {@inheritDoc} */
@Override protected void stop0(boolean cancel) {
if (discoLsnr != null)
cctx.gridEvents().removeLocalEventListener(discoLsnr);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f5f3efd1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheGateway.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheGateway.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheGateway.java
index d9d151c..d63e818 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheGateway.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheGateway.java
@@ -33,7 +33,7 @@ public class GridCacheGateway<K, V> {
private final GridCacheContext<K, V> ctx;
/** Stopped flag for dynamic caches. */
- private volatile boolean stopped;
+ private volatile State state = State.STARTED;
/** */
private GridSpinReadWriteLock rwLock = new GridSpinReadWriteLock();
@@ -56,11 +56,46 @@ public class GridCacheGateway<K, V> {
rwLock.readLock();
- if (stopped) {
- rwLock.readUnlock();
+ checkState(true, true);
+ }
- throw new IllegalStateException("Dynamic cache has been stopped: " + ctx.name());
+ /**
+ *
+ */
+ private boolean checkState(boolean lock, boolean err) {
+ if (state != State.STARTED) {
+ do {
+ if (state == State.STOPPED) {
+ if (lock)
+ rwLock.readUnlock();
+
+ if (err)
+ throw new IllegalStateException("Cache has been stopped: " + ctx.name());
+ else
+ return false;
+ }
+ else {
+ if (lock)
+ rwLock.readUnlock();
+
+ try {
+ synchronized (this) {
+ while (state == State.DISCONNECTED)
+ wait();
+ }
+ }
+ catch (InterruptedException e) {
+ throw new IgniteException(e);
+ }
+
+ if (lock)
+ rwLock.readLock();
+ }
+ }
+ while (state != State.STARTED);
}
+
+ return true;
}
/**
@@ -71,17 +106,11 @@ public class GridCacheGateway<K, V> {
public boolean enterIfNotClosed() {
onEnter();
- // Must unlock in case of unexpected errors to avoid
- // deadlocks during kernal stop.
+ // Must unlock in case of unexpected errors to avoid deadlocks during kernal stop.
rwLock.readLock();
- if (stopped) {
- rwLock.readUnlock();
+ return checkState(true, false);
- return false;
- }
-
- return true;
}
/**
@@ -92,7 +121,7 @@ public class GridCacheGateway<K, V> {
public boolean enterIfNotClosedNoLock() {
onEnter();
- return !stopped;
+ return checkState(false, false);
}
/**
@@ -144,11 +173,7 @@ public class GridCacheGateway<K, V> {
rwLock.readLock();
- if (stopped) {
- rwLock.readUnlock();
-
- throw new IllegalStateException("Cache has been stopped: " + ctx.name());
- }
+ checkState(true, true);
// Must unlock in case of unexpected errors to avoid
// deadlocks during kernal stop.
@@ -169,8 +194,7 @@ public class GridCacheGateway<K, V> {
@Nullable public CacheOperationContext enterNoLock(@Nullable CacheOperationContext opCtx) {
onEnter();
- if (stopped)
- throw new IllegalStateException("Cache has been stopped: " + ctx.name());
+ checkState(false, false);
return setOperationContextPerCall(opCtx);
}
@@ -229,8 +253,48 @@ public class GridCacheGateway<K, V> {
/**
*
*/
- public void block() {
- stopped = true;
+ public void stopped() {
+ state = State.STOPPED;
+
+ synchronized (this) {
+ notifyAll();
+ }
+ }
+
+ /**
+ *
+ */
+ public void onDisconnected() {
+ if (state == State.STARTED)
+ state = State.DISCONNECTED;
+ }
+
+ /**
+ *
+ */
+ public void waitOperations() {
+ rwLock.writeLock();
+
+ rwLock.writeUnlock();
+ }
+
+ /**
+ * @param stopped Cache stopped flag.
+ */
+ public void reconnected(boolean stopped) {
+ rwLock.writeLock();
+
+ try {
+ if (state == State.DISCONNECTED)
+ state = stopped ? State.STOPPED : State.STARTED;
+ }
+ finally {
+ rwLock.writeUnlock();
+ }
+
+ synchronized (this) {
+ notifyAll();
+ }
}
/**
@@ -257,10 +321,28 @@ public class GridCacheGateway<K, V> {
try {
// No-op.
- stopped = true;
+ state = State.STOPPED;
}
finally {
rwLock.writeUnlock();
}
+
+ synchronized (this) {
+ notifyAll();
+ }
+ }
+
+ /**
+ *
+ */
+ private enum State {
+ /** */
+ STARTED,
+
+ /** */
+ DISCONNECTED,
+
+ /** */
+ STOPPED
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f5f3efd1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
index 74a4512..48a16d5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
@@ -170,7 +170,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
/** {@inheritDoc} */
@SuppressWarnings("BusyWait")
- @Override protected void onKernalStop0(boolean cancel) {
+ @Override protected void onKernalStop0(boolean cancel, boolean disconnected) {
cctx.gridIO().removeMessageListener(TOPIC_CACHE);
for (Object ordTopic : orderedHandlers.keySet())
@@ -891,6 +891,13 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
}
/**
+ * @param cacheId Cache ID to remove handlers for.
+ */
+ public void removeHandler(int cacheId, Class<? extends GridCacheMessage> type) {
+ clsHandlers.remove(new ListenerKey(cacheId, type));
+ }
+
+ /**
* @param msgCls Message class to check.
* @return Message index.
*/
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f5f3efd1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
index c528e08..e2d22dd 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
@@ -216,7 +216,7 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter {
}
/** {@inheritDoc} */
- @Override public void onKernalStop0(boolean cancel) {
+ @Override public void onKernalStop0(boolean cancel, boolean disconnected) {
cctx.gridEvents().removeLocalEventListener(discoLsnr);
}
@@ -293,9 +293,13 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter {
/**
* Cancels all client futures.
+ *
+ * @param stop If {@code true} node is stopping, otherwise disconnected.
*/
- public void cancelClientFutures() {
- IgniteCheckedException e = new IgniteCheckedException("Operation has been cancelled (grid is stopping).");
+ public void cancelClientFutures(boolean stop) {
+ IgniteCheckedException e = stop ?
+ new IgniteCheckedException("Operation has been cancelled (node is stopping).") :
+ new IgniteCheckedException("Operation has been cancelled (node disconnected).");
for (Collection<GridCacheFuture<?>> futures : futs.values()) {
for (GridCacheFuture<?> future : futures)
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f5f3efd1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index af87685..f0c9b3b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -207,6 +207,11 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
};
/** {@inheritDoc} */
+ @Override public boolean restartOnDisconnect() {
+ return true;
+ }
+
+ /** {@inheritDoc} */
@Override protected void start0() throws IgniteCheckedException {
super.start0();
@@ -281,6 +286,11 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
break;
}
+ catch (IgniteDisconnectedCheckedException e) {
+ log.info("Disconnected while waiting for initial partition map exchange: " + e);
+
+ break;
+ }
catch (IgniteFutureTimeoutCheckedException ignored) {
if (first) {
U.warn(log, "Failed to wait for initial partition map exchange. " +
@@ -313,13 +323,23 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
}
/** {@inheritDoc} */
- @Override protected void onKernalStop0(boolean cancel) {
+ @Override protected void onKernalStop0(boolean cancel, boolean disconnected) {
+ cctx.gridEvents().removeLocalEventListener(discoLsnr);
+
+ cctx.io().removeHandler(0, GridDhtPartitionsSingleMessage.class);
+ cctx.io().removeHandler(0, GridDhtPartitionsFullMessage.class);
+ cctx.io().removeHandler(0, GridDhtPartitionsSingleRequest.class);
+
+ IgniteCheckedException err = disconnected ?
+ new IgniteDisconnectedCheckedException("Node disconnected: " + cctx.gridName()) :
+ new IgniteInterruptedCheckedException("Node is stopping: " + cctx.gridName());
+
// Finish all exchange futures.
for (GridDhtPartitionsExchangeFuture f : exchFuts.values())
- f.onDone(new IgniteInterruptedCheckedException("Grid is stopping: " + cctx.gridName()));
+ f.onDone(err);
for (AffinityReadyFuture f : readyFuts.values())
- f.onDone(new IgniteInterruptedCheckedException("Grid is stopping: " + cctx.gridName()));
+ f.onDone(err);
U.cancel(exchWorker);
@@ -1099,6 +1119,9 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
catch (IgniteInterruptedCheckedException e) {
throw e;
}
+ catch (IgniteDisconnectedCheckedException e) {
+ return;
+ }
catch (IgniteCheckedException e) {
U.error(log, "Failed to wait for completion of partition map exchange " +
"(preloading will not start): " + exchFut, e);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f5f3efd1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index 2f7f22c..e11a221 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -341,8 +341,8 @@ public class GridCacheProcessor extends GridProcessorAdapter {
"Serializable transactions are disabled while default transaction isolation is SERIALIZABLE " +
"(most likely misconfiguration - either update 'isTxSerializableEnabled' or " +
"'defaultTxIsolationLevel' properties) for cache: " + U.maskName(cc.getName()),
- "Serializable transactions are disabled while default transaction isolation is SERIALIZABLE " +
- "for cache: " + U.maskName(cc.getName()));
+ "Serializable transactions are disabled while default transaction isolation is SERIALIZABLE " +
+ "for cache: " + U.maskName(cc.getName()));
if (cc.isWriteBehindEnabled()) {
if (cfgStore == null)
@@ -567,8 +567,8 @@ public class GridCacheProcessor extends GridProcessorAdapter {
CacheConfiguration[] cfgs = ctx.config().getCacheConfiguration();
- sharedCtx = createSharedContext(ctx, CU.startStoreSessionListeners(ctx,
- ctx.config().getCacheStoreSessionListenerFactories()));
+ sharedCtx = createSharedContext(ctx, CU.startStoreSessionListeners(
+ ctx, ctx.config().getCacheStoreSessionListenerFactories()));
ctx.performance().add("Disable serializable transactions (set 'txSerializableEnabled' to false)",
!ctx.config().getTransactionConfiguration().isTxSerializableEnabled());
@@ -871,10 +871,42 @@ public class GridCacheProcessor extends GridProcessorAdapter {
it.hasPrevious();) {
GridCacheSharedManager<?, ?> mgr = it.previous();
- mgr.onKernalStop(cancel);
+ mgr.onKernalStop(cancel, false);
}
}
+ /** {@inheritDoc} */
+ @Override public void onDisconnected() throws IgniteCheckedException {
+ for (GridCacheAdapter cache : caches.values())
+ cache.context().gate().onDisconnected();
+
+ sharedCtx.mvcc().cancelClientFutures(false);
+
+ for (GridCacheAdapter cache : caches.values())
+ cache.disconnected();
+
+ registeredCaches.clear();
+
+ sharedCtx.onDisconnected();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onReconnected() throws IgniteCheckedException {
+ for (GridCacheAdapter cache : caches.values())
+ cache.context().gate().reconnected(false);
+
+ ctx.marshallerContext().onMarshallerCacheStarted(ctx);
+
+ marshallerCache().context().preloader().syncFuture().listen(new CIX1<IgniteInternalFuture<?>>() {
+ @Override public void applyx(IgniteInternalFuture<?> f) throws IgniteCheckedException {
+ ctx.marshallerContext().onMarshallerCachePreloaded(ctx);
+ }
+ });
+
+ for (GridCacheSharedManager<?, ?> mgr : sharedCtx.managers())
+ mgr.onKernalStart();
+ }
+
/**
* @param cache Cache to start.
* @throws IgniteCheckedException If failed to start cache.
@@ -1487,7 +1519,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
IgniteCacheProxy<?, ?> proxy = jCacheProxies.get(maskNull(req.cacheName()));
if (proxy != null)
- proxy.gate().block();
+ proxy.gate().stopped();
}
/**
@@ -1591,7 +1623,8 @@ public class GridCacheProcessor extends GridProcessorAdapter {
* @return Shared context.
*/
@SuppressWarnings("unchecked")
- private GridCacheSharedContext createSharedContext(GridKernalContext kernalCtx,
+ private GridCacheSharedContext createSharedContext(
+ GridKernalContext kernalCtx,
Collection<CacheStoreSessionListener> storeSesLsnrs) {
IgniteTxManager tm = new IgniteTxManager();
GridCacheMvccManager mvccMgr = new GridCacheMvccManager();
@@ -2796,8 +2829,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
* Cancel all user operations.
*/
public void cancelUserOperations() {
- for (GridCacheAdapter<?, ?> cache : caches.values())
- cache.ctx.mvcc().cancelClientFutures();
+ sharedCtx.mvcc().cancelClientFutures(true);
}
/**