You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vk...@apache.org on 2018/04/13 21:48:24 UTC
ignite git commit: IGNITE-2766 - Opportunistically reopen cache after
client reconnect - Fixes #3417
Repository: ignite
Updated Branches:
refs/heads/master c6ab036dc -> 0991437a3
IGNITE-2766 - Opportunistically reopen cache after client reconnect - Fixes #3417
Signed-off-by: Valentin Kulichenko <va...@gmail.com>
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/0991437a
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/0991437a
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/0991437a
Branch: refs/heads/master
Commit: 0991437a3f4d38e68483a8bcadd3daf614b7b2dc
Parents: c6ab036
Author: Ilya Kasnacheev <il...@gmail.com>
Authored: Fri Apr 13 14:48:10 2018 -0700
Committer: Valentin Kulichenko <va...@gmail.com>
Committed: Fri Apr 13 14:48:10 2018 -0700
----------------------------------------------------------------------
.../cache/GatewayProtectedCacheProxy.java | 676 ++++++++-----------
.../processors/cache/GridCacheGateway.java | 7 +
.../processors/cache/IgniteCacheProxyImpl.java | 31 +-
.../ignite/spi/discovery/tcp/ClientImpl.java | 6 +-
.../IgniteCacheQueryCacheDestroySelfTest.java | 4 +
.../ClientReconnectAfterClusterRestartTest.java | 33 +-
6 files changed, 316 insertions(+), 441 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/0991437a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GatewayProtectedCacheProxy.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GatewayProtectedCacheProxy.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GatewayProtectedCacheProxy.java
index 27fc395..2e8120b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GatewayProtectedCacheProxy.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GatewayProtectedCacheProxy.java
@@ -36,6 +36,7 @@ import javax.cache.integration.CompletionListener;
import javax.cache.processor.EntryProcessor;
import javax.cache.processor.EntryProcessorResult;
import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.cache.CacheEntry;
import org.apache.ignite.cache.CacheEntryProcessor;
import org.apache.ignite.cache.CacheMetrics;
@@ -48,6 +49,9 @@ import org.apache.ignite.cache.query.QueryMetrics;
import org.apache.ignite.cache.query.SqlFieldsQuery;
import org.apache.ignite.cluster.ClusterGroup;
import org.apache.ignite.internal.AsyncSupportAdapter;
+import org.apache.ignite.internal.GridKernalState;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.internal.util.future.IgniteFutureImpl;
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
import org.apache.ignite.lang.IgniteBiPredicate;
import org.apache.ignite.lang.IgniteClosure;
@@ -138,15 +142,13 @@ public class GatewayProtectedCacheProxy<K, V> extends AsyncSupportAdapter<Ignite
/** {@inheritDoc} */
@Override public GatewayProtectedCacheProxy<K, V> withExpiryPolicy(ExpiryPolicy plc) {
- GridCacheGateway<K, V> gate = gate();
-
- CacheOperationContext prev = onEnter(gate, opCtx);
+ CacheOperationGate opGate = onEnter();
try {
return new GatewayProtectedCacheProxy<>(delegate, opCtx.withExpiryPolicy(plc), lock);
}
finally {
- onLeave(gate, prev);
+ onLeave(opGate);
}
}
@@ -157,9 +159,7 @@ public class GatewayProtectedCacheProxy<K, V> extends AsyncSupportAdapter<Ignite
/** {@inheritDoc} */
@Override public GatewayProtectedCacheProxy<K, V> skipStore() {
- GridCacheGateway<K, V> gate = gate();
-
- CacheOperationContext prev = onEnter(gate, opCtx);
+ CacheOperationGate opGate = onEnter();
try {
boolean skip = opCtx.skipStore();
@@ -170,15 +170,13 @@ public class GatewayProtectedCacheProxy<K, V> extends AsyncSupportAdapter<Ignite
return new GatewayProtectedCacheProxy<>(delegate, opCtx.setSkipStore(true), lock);
}
finally {
- onLeave(gate, prev);
+ onLeave(opGate);
}
}
/** {@inheritDoc} */
@Override public GatewayProtectedCacheProxy<K, V> withNoRetries() {
- GridCacheGateway<K, V> gate = gate();
-
- CacheOperationContext prev = onEnter(gate, opCtx);
+ CacheOperationGate opGate = onEnter();
try {
boolean noRetries = opCtx.noRetries();
@@ -189,15 +187,13 @@ public class GatewayProtectedCacheProxy<K, V> extends AsyncSupportAdapter<Ignite
return new GatewayProtectedCacheProxy<>(delegate, opCtx.setNoRetries(true), lock);
}
finally {
- onLeave(gate, prev);
+ onLeave(opGate);
}
}
/** {@inheritDoc} */
@Override public GatewayProtectedCacheProxy<K, V> withPartitionRecover() {
- GridCacheGateway<K, V> gate = gate();
-
- CacheOperationContext prev = onEnter(gate, opCtx);
+ CacheOperationGate opGate = onEnter();
try {
boolean recovery = opCtx.recovery();
@@ -208,7 +204,7 @@ public class GatewayProtectedCacheProxy<K, V> extends AsyncSupportAdapter<Ignite
return new GatewayProtectedCacheProxy<>(delegate, opCtx.setRecovery(true), lock);
}
finally {
- onLeave(gate, prev);
+ onLeave(opGate);
}
}
@@ -219,23 +215,19 @@ public class GatewayProtectedCacheProxy<K, V> extends AsyncSupportAdapter<Ignite
/** {@inheritDoc} */
@Override public <K1, V1> GatewayProtectedCacheProxy<K1, V1> keepBinary() {
- GridCacheGateway<K, V> gate = gate();
-
- CacheOperationContext prev = onEnter(gate, opCtx);
+ CacheOperationGate opGate = onEnter();
try {
return new GatewayProtectedCacheProxy<>((IgniteCacheProxy<K1, V1>) delegate, opCtx.keepBinary(), lock);
}
finally {
- onLeave(gate, prev);
+ onLeave(opGate);
}
}
/** {@inheritDoc} */
@Override public GatewayProtectedCacheProxy<K, V> withDataCenterId(byte dataCenterId) {
- GridCacheGateway<K, V> gate = gate();
-
- CacheOperationContext prev = onEnter(gate, opCtx);
+ CacheOperationGate opGate = onEnter();
try {
Byte prevDataCenterId = opCtx.dataCenterId();
@@ -246,91 +238,79 @@ public class GatewayProtectedCacheProxy<K, V> extends AsyncSupportAdapter<Ignite
return new GatewayProtectedCacheProxy<>(delegate, opCtx.setDataCenterId(dataCenterId), lock);
}
finally {
- onLeave(gate, prev);
+ onLeave(opGate);
}
}
/** {@inheritDoc} */
@Override public void loadCache(@Nullable IgniteBiPredicate<K, V> p, @Nullable Object... args) throws CacheException {
- GridCacheGateway<K, V> gate = gate();
-
- CacheOperationContext prev = onEnter(gate, opCtx);
+ CacheOperationGate opGate = onEnter();
try {
delegate.loadCache(p, args);
}
finally {
- onLeave(gate, prev);
+ onLeave(opGate);
}
}
/** {@inheritDoc} */
@Override public IgniteFuture<Void> loadCacheAsync(@Nullable IgniteBiPredicate<K, V> p, @Nullable Object... args) throws CacheException {
- GridCacheGateway<K, V> gate = gate();
-
- CacheOperationContext prev = onEnter(gate, opCtx);
+ CacheOperationGate opGate = onEnter();
try {
return delegate.loadCacheAsync(p, args);
}
finally {
- onLeave(gate, prev);
+ onLeave(opGate);
}
}
/** {@inheritDoc} */
@Override public void localLoadCache(@Nullable IgniteBiPredicate<K, V> p, @Nullable Object... args) throws CacheException {
- GridCacheGateway<K, V> gate = gate();
-
- CacheOperationContext prev = onEnter(gate, opCtx);
+ CacheOperationGate opGate = onEnter();
try {
delegate.localLoadCache(p, args);
}
finally {
- onLeave(gate, prev);
+ onLeave(opGate);
}
}
/** {@inheritDoc} */
@Override public IgniteFuture<Void> localLoadCacheAsync(@Nullable IgniteBiPredicate<K, V> p, @Nullable Object... args) throws CacheException {
- GridCacheGateway<K, V> gate = gate();
-
- CacheOperationContext prev = onEnter(gate, opCtx);
+ CacheOperationGate opGate = onEnter();
try {
return delegate.localLoadCacheAsync(p, args);
}
finally {
- onLeave(gate, prev);
+ onLeave(opGate);
}
}
/** {@inheritDoc} */
@Override public V getAndPutIfAbsent(K key, V val) throws CacheException, TransactionException {
- GridCacheGateway<K, V> gate = gate();
-
- CacheOperationContext prev = onEnter(gate, opCtx);
+ CacheOperationGate opGate = onEnter();
try {
return delegate.getAndPutIfAbsent(key, val);
}
finally {
- onLeave(gate, prev);
+ onLeave(opGate);
}
}
/** {@inheritDoc} */
@Override public IgniteFuture<V> getAndPutIfAbsentAsync(K key, V val) throws CacheException, TransactionException {
- GridCacheGateway<K, V> gate = gate();
-
- CacheOperationContext prev = onEnter(gate, opCtx);
+ CacheOperationGate opGate = onEnter();
try {
return delegate.getAndPutIfAbsentAsync(key, val);
}
finally {
- onLeave(gate, prev);
+ onLeave(opGate);
}
}
@@ -346,1093 +326,937 @@ public class GatewayProtectedCacheProxy<K, V> extends AsyncSupportAdapter<Ignite
/** {@inheritDoc} */
@Override public boolean isLocalLocked(K key, boolean byCurrThread) {
- GridCacheGateway<K, V> gate = gate();
-
- CacheOperationContext prev = onEnter(gate, opCtx);
+ CacheOperationGate opGate = onEnter();
try {
return delegate.isLocalLocked(key, byCurrThread);
}
finally {
- onLeave(gate, prev);
+ onLeave(opGate);
}
}
/** {@inheritDoc} */
@Override public <R> QueryCursor<R> query(Query<R> qry) {
- GridCacheGateway<K, V> gate = gate();
-
- CacheOperationContext prev = onEnter(gate, opCtx);
+ CacheOperationGate opGate = onEnter();
try {
return delegate.query(qry);
}
finally {
- onLeave(gate, prev);
+ onLeave(opGate);
}
}
/** {@inheritDoc} */
@Override public FieldsQueryCursor<List<?>> query(SqlFieldsQuery qry) {
- GridCacheGateway<K, V> gate = gate();
-
- CacheOperationContext prev = onEnter(gate, opCtx);
+ CacheOperationGate opGate = onEnter();
try {
return delegate.query(qry);
}
finally {
- onLeave(gate, prev);
+ onLeave(opGate);
}
}
/** {@inheritDoc} */
@Override public List<FieldsQueryCursor<List<?>>> queryMultipleStatements(SqlFieldsQuery qry) {
- GridCacheGateway<K, V> gate = gate();
-
- CacheOperationContext prev = onEnter(gate, opCtx);
+ CacheOperationGate opGate = onEnter();
try {
return delegate.queryMultipleStatements(qry);
}
finally {
- onLeave(gate, prev);
+ onLeave(opGate);
}
}
/** {@inheritDoc} */
@Override public <T, R> QueryCursor<R> query(Query<T> qry, IgniteClosure<T, R> transformer) {
- GridCacheGateway<K, V> gate = gate();
-
- CacheOperationContext prev = onEnter(gate, opCtx);
+ CacheOperationGate opGate = onEnter();
try {
return delegate.query(qry, transformer);
}
finally {
- onLeave(gate, prev);
+ onLeave(opGate);
}
}
/** {@inheritDoc} */
@Override public Iterable<Entry<K, V>> localEntries(CachePeekMode... peekModes) throws CacheException {
- GridCacheGateway<K, V> gate = gate();
-
- CacheOperationContext prev = onEnter(gate, opCtx);
+ CacheOperationGate opGate = onEnter();
try {
return delegate.localEntries(peekModes);
}
finally {
- onLeave(gate, prev);
+ onLeave(opGate);
}
}
/** {@inheritDoc} */
@Override public QueryMetrics queryMetrics() {
- GridCacheGateway<K, V> gate = gate();
-
- CacheOperationContext prev = onEnter(gate, opCtx);
+ CacheOperationGate opGate = onEnter();
try {
return delegate.queryMetrics();
}
finally {
- onLeave(gate, prev);
+ onLeave(opGate);
}
}
/** {@inheritDoc} */
@Override public void resetQueryMetrics() {
- GridCacheGateway<K, V> gate = gate();
-
- CacheOperationContext prev = onEnter(gate, opCtx);
+ CacheOperationGate opGate = onEnter();
try {
delegate.resetQueryMetrics();
}
finally {
- onLeave(gate, prev);
+ onLeave(opGate);
}
}
/** {@inheritDoc} */
@Override public Collection<? extends QueryDetailMetrics> queryDetailMetrics() {
- GridCacheGateway<K, V> gate = gate();
-
- CacheOperationContext prev = onEnter(gate, opCtx);
+ CacheOperationGate opGate = onEnter();
try {
return delegate.queryDetailMetrics();
}
finally {
- onLeave(gate, prev);
+ onLeave(opGate);
}
}
/** {@inheritDoc} */
@Override public void resetQueryDetailMetrics() {
- GridCacheGateway<K, V> gate = gate();
-
- CacheOperationContext prev = onEnter(gate, opCtx);
+ CacheOperationGate opGate = onEnter();
try {
delegate.resetQueryDetailMetrics();
}
finally {
- onLeave(gate, prev);
+ onLeave(opGate);
}
}
/** {@inheritDoc} */
@Override public void localEvict(Collection<? extends K> keys) {
- GridCacheGateway<K, V> gate = gate();
-
- CacheOperationContext prev = onEnter(gate, opCtx);
+ CacheOperationGate opGate = onEnter();
try {
delegate.localEvict(keys);
}
finally {
- onLeave(gate, prev);
+ onLeave(opGate);
}
}
/** {@inheritDoc} */
@Override public V localPeek(K key, CachePeekMode... peekModes) {
- GridCacheGateway<K, V> gate = gate();
-
- CacheOperationContext prev = onEnter(gate, opCtx);
+ CacheOperationGate opGate = onEnter();
try {
return delegate.localPeek(key, peekModes);
}
finally {
- onLeave(gate, prev);
+ onLeave(opGate);
}
}
/** {@inheritDoc} */
@Override public int size(CachePeekMode... peekModes) throws CacheException {
- GridCacheGateway<K, V> gate = gate();
-
- CacheOperationContext prev = onEnter(gate, opCtx);
+ CacheOperationGate opGate = onEnter();
try {
return delegate.size(peekModes);
}
finally {
- onLeave(gate, prev);
+ onLeave(opGate);
}
}
/** {@inheritDoc} */
@Override public IgniteFuture<Integer> sizeAsync(CachePeekMode... peekModes) throws CacheException {
- GridCacheGateway<K, V> gate = gate();
-
- CacheOperationContext prev = onEnter(gate, opCtx);
+ CacheOperationGate opGate = onEnter();
try {
return delegate.sizeAsync(peekModes);
}
finally {
- onLeave(gate, prev);
+ onLeave(opGate);
}
}
/** {@inheritDoc} */
@Override public long sizeLong(CachePeekMode... peekModes) throws CacheException {
- GridCacheGateway<K, V> gate = gate();
-
- CacheOperationContext prev = onEnter(gate, opCtx);
+ CacheOperationGate opGate = onEnter();
try {
return delegate.sizeLong(peekModes);
}
finally {
- onLeave(gate, prev);
+ onLeave(opGate);
}
}
/** {@inheritDoc} */
@Override public IgniteFuture<Long> sizeLongAsync(CachePeekMode... peekModes) throws CacheException {
- GridCacheGateway<K, V> gate = gate();
-
- CacheOperationContext prev = onEnter(gate, opCtx);
+ CacheOperationGate opGate = onEnter();
try {
return delegate.sizeLongAsync(peekModes);
}
finally {
- onLeave(gate, prev);
+ onLeave(opGate);
}
}
/** {@inheritDoc} */
@Override public long sizeLong(int partition, CachePeekMode... peekModes) throws CacheException {
- GridCacheGateway<K, V> gate = gate();
-
- CacheOperationContext prev = onEnter(gate, opCtx);
+ CacheOperationGate opGate = onEnter();
try {
return delegate.sizeLong(partition, peekModes);
}
finally {
- onLeave(gate, prev);
+ onLeave(opGate);
}
}
/** {@inheritDoc} */
@Override public IgniteFuture<Long> sizeLongAsync(int partition, CachePeekMode... peekModes) throws CacheException {
- GridCacheGateway<K, V> gate = gate();
-
- CacheOperationContext prev = onEnter(gate, opCtx);
+ CacheOperationGate opGate = onEnter();
try {
return delegate.sizeLongAsync(partition, peekModes);
}
finally {
- onLeave(gate, prev);
+ onLeave(opGate);
}
}
/** {@inheritDoc} */
@Override public int localSize(CachePeekMode... peekModes) {
- GridCacheGateway<K, V> gate = gate();
-
- CacheOperationContext prev = onEnter(gate, opCtx);
+ CacheOperationGate opGate = onEnter();
try {
return delegate.localSize(peekModes);
}
finally {
- onLeave(gate, prev);
+ onLeave(opGate);
}
}
/** {@inheritDoc} */
@Override public long localSizeLong(CachePeekMode... peekModes) {
- GridCacheGateway<K, V> gate = gate();
-
- CacheOperationContext prev = onEnter(gate, opCtx);
+ CacheOperationGate opGate = onEnter();
try {
return delegate.localSizeLong(peekModes);
}
finally {
- onLeave(gate, prev);
+ onLeave(opGate);
}
}
/** {@inheritDoc} */
@Override public long localSizeLong(int partition, CachePeekMode... peekModes) {
- GridCacheGateway<K, V> gate = gate();
-
- CacheOperationContext prev = onEnter(gate, opCtx);
+ CacheOperationGate opGate = onEnter();
try {
return delegate.localSizeLong(partition, peekModes);
}
finally {
- onLeave(gate, prev);
+ onLeave(opGate);
}
}
/** {@inheritDoc} */
@Override public <T> Map<K, EntryProcessorResult<T>> invokeAll(Map<? extends K, ? extends EntryProcessor<K, V, T>> map, Object... args) throws TransactionException {
- GridCacheGateway<K, V> gate = gate();
-
- CacheOperationContext prev = onEnter(gate, opCtx);
+ CacheOperationGate opGate = onEnter();
try {
return delegate.invokeAll(map, args);
}
finally {
- onLeave(gate, prev);
+ onLeave(opGate);
}
}
/** {@inheritDoc} */
@Override public <T> IgniteFuture<Map<K, EntryProcessorResult<T>>> invokeAllAsync(Map<? extends K, ? extends EntryProcessor<K, V, T>> map, Object... args) throws TransactionException {
- GridCacheGateway<K, V> gate = gate();
-
- CacheOperationContext prev = onEnter(gate, opCtx);
+ CacheOperationGate opGate = onEnter();
try {
return delegate.invokeAllAsync(map, args);
}
finally {
- onLeave(gate, prev);
+ onLeave(opGate);
}
}
/** {@inheritDoc} */
@Override public V get(K key) throws TransactionException {
- GridCacheGateway<K, V> gate = gate();
-
- CacheOperationContext prev = onEnter(gate, opCtx);
+ CacheOperationGate opGate = onEnter();
try {
return delegate.get(key);
}
finally {
- onLeave(gate, prev);
+ onLeave(opGate);
}
}
/** {@inheritDoc} */
@Override public IgniteFuture<V> getAsync(K key) {
- GridCacheGateway<K, V> gate = gate();
-
- CacheOperationContext prev = onEnter(gate, opCtx);
+ CacheOperationGate opGate = onEnter();
try {
return delegate.getAsync(key);
}
finally {
- onLeave(gate, prev);
+ onLeave(opGate);
}
}
/** {@inheritDoc} */
@Override public CacheEntry<K, V> getEntry(K key) throws TransactionException {
- GridCacheGateway<K, V> gate = gate();
-
- CacheOperationContext prev = onEnter(gate, opCtx);
+ CacheOperationGate opGate = onEnter();
try {
return delegate.getEntry(key);
}
finally {
- onLeave(gate, prev);
+ onLeave(opGate);
}
}
/** {@inheritDoc} */
@Override public IgniteFuture<CacheEntry<K, V>> getEntryAsync(K key) throws TransactionException {
- GridCacheGateway<K, V> gate = gate();
-
- CacheOperationContext prev = onEnter(gate, opCtx);
+ CacheOperationGate opGate = onEnter();
try {
return delegate.getEntryAsync(key);
}
finally {
- onLeave(gate, prev);
+ onLeave(opGate);
}
}
/** {@inheritDoc} */
@Override public Map<K, V> getAll(Set<? extends K> keys) throws TransactionException {
- GridCacheGateway<K, V> gate = gate();
-
- CacheOperationContext prev = onEnter(gate, opCtx);
+ CacheOperationGate opGate = onEnter();
try {
return delegate.getAll(keys);
}
finally {
- onLeave(gate, prev);
+ onLeave(opGate);
}
}
/** {@inheritDoc} */
@Override public IgniteFuture<Map<K, V>> getAllAsync(Set<? extends K> keys) throws TransactionException {
- GridCacheGateway<K, V> gate = gate();
-
- CacheOperationContext prev = onEnter(gate, opCtx);
+ CacheOperationGate opGate = onEnter();
try {
return delegate.getAllAsync(keys);
}
finally {
- onLeave(gate, prev);
+ onLeave(opGate);
}
}
/** {@inheritDoc} */
@Override public Collection<CacheEntry<K, V>> getEntries(Set<? extends K> keys) throws TransactionException {
- GridCacheGateway<K, V> gate = gate();
-
- CacheOperationContext prev = onEnter(gate, opCtx);
+ CacheOperationGate opGate = onEnter();
try {
return delegate.getEntries(keys);
}
finally {
- onLeave(gate, prev);
+ onLeave(opGate);
}
}
/** {@inheritDoc} */
@Override public IgniteFuture<Collection<CacheEntry<K, V>>> getEntriesAsync(Set<? extends K> keys) throws TransactionException {
- GridCacheGateway<K, V> gate = gate();
-
- CacheOperationContext prev = onEnter(gate, opCtx);
+ CacheOperationGate opGate = onEnter();
try {
return delegate.getEntriesAsync(keys);
}
finally {
- onLeave(gate, prev);
+ onLeave(opGate);
}
}
/** {@inheritDoc} */
@Override public Map<K, V> getAllOutTx(Set<? extends K> keys) {
- GridCacheGateway<K, V> gate = gate();
-
- CacheOperationContext prev = onEnter(gate, opCtx);
+ CacheOperationGate opGate = onEnter();
try {
return delegate.getAllOutTx(keys);
}
finally {
- onLeave(gate, prev);
+ onLeave(opGate);
}
}
/** {@inheritDoc} */
@Override public IgniteFuture<Map<K, V>> getAllOutTxAsync(Set<? extends K> keys) {
- GridCacheGateway<K, V> gate = gate();
-
- CacheOperationContext prev = onEnter(gate, opCtx);
+ CacheOperationGate opGate = onEnter();
try {
return delegate.getAllOutTxAsync(keys);
}
finally {
- onLeave(gate, prev);
+ onLeave(opGate);
}
}
/** {@inheritDoc} */
@Override public boolean containsKey(K key) throws TransactionException {
- GridCacheGateway<K, V> gate = gate();
-
- CacheOperationContext prev = onEnter(gate, opCtx);
+ CacheOperationGate opGate = onEnter();
try {
return delegate.containsKey(key);
}
finally {
- onLeave(gate, prev);
+ onLeave(opGate);
}
}
/** {@inheritDoc} */
@Override public void loadAll(Set<? extends K> keys, boolean replaceExisting, CompletionListener completionListener) {
- GridCacheGateway<K, V> gate = gate();
-
- CacheOperationContext prev = onEnter(gate, opCtx);
+ CacheOperationGate opGate = onEnter();
try {
delegate.loadAll(keys, replaceExisting, completionListener);
}
finally {
- onLeave(gate, prev);
+ onLeave(opGate);
}
}
/** {@inheritDoc} */
@Override public IgniteFuture<Boolean> containsKeyAsync(K key) throws TransactionException {
- GridCacheGateway<K, V> gate = gate();
-
- CacheOperationContext prev = onEnter(gate, opCtx);
+ CacheOperationGate opGate = onEnter();
try {
return delegate.containsKeyAsync(key);
}
finally {
- onLeave(gate, prev);
+ onLeave(opGate);
}
}
/** {@inheritDoc} */
@Override public boolean containsKeys(Set<? extends K> keys) throws TransactionException {
- GridCacheGateway<K, V> gate = gate();
-
- CacheOperationContext prev = onEnter(gate, opCtx);
+ CacheOperationGate opGate = onEnter();
try {
return delegate.containsKeys(keys);
}
finally {
- onLeave(gate, prev);
+ onLeave(opGate);
}
}
/** {@inheritDoc} */
@Override public IgniteFuture<Boolean> containsKeysAsync(Set<? extends K> keys) throws TransactionException {
- GridCacheGateway<K, V> gate = gate();
-
- CacheOperationContext prev = onEnter(gate, opCtx);
+ CacheOperationGate opGate = onEnter();
try {
return delegate.containsKeysAsync(keys);
}
finally {
- onLeave(gate, prev);
+ onLeave(opGate);
}
}
/** {@inheritDoc} */
@Override public void put(K key, V val) throws TransactionException {
- GridCacheGateway<K, V> gate = gate();
-
- CacheOperationContext prev = onEnter(gate, opCtx);
+ CacheOperationGate opGate = onEnter();
try {
delegate.put(key, val);
}
finally {
- onLeave(gate, prev);
+ onLeave(opGate);
}
}
/** {@inheritDoc} */
@Override public IgniteFuture<Void> putAsync(K key, V val) throws TransactionException {
- GridCacheGateway<K, V> gate = gate();
-
- CacheOperationContext prev = onEnter(gate, opCtx);
+ CacheOperationGate opGate = onEnter();
try {
return delegate.putAsync(key, val);
}
finally {
- onLeave(gate, prev);
+ onLeave(opGate);
}
}
/** {@inheritDoc} */
@Override public V getAndPut(K key, V val) throws TransactionException {
- GridCacheGateway<K, V> gate = gate();
-
- CacheOperationContext prev = onEnter(gate, opCtx);
+ CacheOperationGate opGate = onEnter();
try {
return delegate.getAndPut(key, val);
}
finally {
- onLeave(gate, prev);
+ onLeave(opGate);
}
}
/** {@inheritDoc} */
@Override public IgniteFuture<V> getAndPutAsync(K key, V val) throws TransactionException {
- GridCacheGateway<K, V> gate = gate();
-
- CacheOperationContext prev = onEnter(gate, opCtx);
+ CacheOperationGate opGate = onEnter();
try {
return delegate.getAndPutAsync(key, val);
}
finally {
- onLeave(gate, prev);
+ onLeave(opGate);
}
}
/** {@inheritDoc} */
@Override public void putAll(Map<? extends K, ? extends V> map) throws TransactionException {
- GridCacheGateway<K, V> gate = gate();
-
- CacheOperationContext prev = onEnter(gate, opCtx);
+ CacheOperationGate opGate = onEnter();
try {
delegate.putAll(map);
}
finally {
- onLeave(gate, prev);
+ onLeave(opGate);
}
}
/** {@inheritDoc} */
@Override public IgniteFuture<Void> putAllAsync(Map<? extends K, ? extends V> map) throws TransactionException {
- GridCacheGateway<K, V> gate = gate();
-
- CacheOperationContext prev = onEnter(gate, opCtx);
+ CacheOperationGate opGate = onEnter();
try {
return delegate.putAllAsync(map);
}
finally {
- onLeave(gate, prev);
+ onLeave(opGate);
}
}
/** {@inheritDoc} */
@Override public boolean putIfAbsent(K key, V val) throws TransactionException {
- GridCacheGateway<K, V> gate = gate();
-
- CacheOperationContext prev = onEnter(gate, opCtx);
+ CacheOperationGate opGate = onEnter();
try {
return delegate.putIfAbsent(key, val);
}
finally {
- onLeave(gate, prev);
+ onLeave(opGate);
}
}
/** {@inheritDoc} */
@Override public IgniteFuture<Boolean> putIfAbsentAsync(K key, V val) {
- GridCacheGateway<K, V> gate = gate();
-
- CacheOperationContext prev = onEnter(gate, opCtx);
+ CacheOperationGate opGate = onEnter();
try {
return delegate.putIfAbsentAsync(key, val);
}
finally {
- onLeave(gate, prev);
+ onLeave(opGate);
}
}
/** {@inheritDoc} */
@Override public boolean remove(K key) throws TransactionException {
- GridCacheGateway<K, V> gate = gate();
-
- CacheOperationContext prev = onEnter(gate, opCtx);
+ CacheOperationGate opGate = onEnter();
try {
return delegate.remove(key);
}
finally {
- onLeave(gate, prev);
+ onLeave(opGate);
}
}
/** {@inheritDoc} */
@Override public IgniteFuture<Boolean> removeAsync(K key) throws TransactionException {
- GridCacheGateway<K, V> gate = gate();
-
- CacheOperationContext prev = onEnter(gate, opCtx);
+ CacheOperationGate opGate = onEnter();
try {
return delegate.removeAsync(key);
}
finally {
- onLeave(gate, prev);
+ onLeave(opGate);
}
}
/** {@inheritDoc} */
@Override public boolean remove(K key, V oldVal) throws TransactionException {
- GridCacheGateway<K, V> gate = gate();
-
- CacheOperationContext prev = onEnter(gate, opCtx);
+ CacheOperationGate opGate = onEnter();
try {
return delegate.remove(key, oldVal);
}
finally {
- onLeave(gate, prev);
+ onLeave(opGate);
}
}
/** {@inheritDoc} */
@Override public IgniteFuture<Boolean> removeAsync(K key, V oldVal) throws TransactionException {
- GridCacheGateway<K, V> gate = gate();
-
- CacheOperationContext prev = onEnter(gate, opCtx);
+ CacheOperationGate opGate = onEnter();
try {
return delegate.removeAsync(key, oldVal);
}
finally {
- onLeave(gate, prev);
+ onLeave(opGate);
}
}
/** {@inheritDoc} */
@Override public V getAndRemove(K key) throws TransactionException {
- GridCacheGateway<K, V> gate = gate();
-
- CacheOperationContext prev = onEnter(gate, opCtx);
+ CacheOperationGate opGate = onEnter();
try {
return delegate.getAndRemove(key);
}
finally {
- onLeave(gate, prev);
+ onLeave(opGate);
}
}
/** {@inheritDoc} */
@Override public IgniteFuture<V> getAndRemoveAsync(K key) throws TransactionException {
- GridCacheGateway<K, V> gate = gate();
-
- CacheOperationContext prev = onEnter(gate, opCtx);
+ CacheOperationGate opGate = onEnter();
try {
return delegate.getAndRemoveAsync(key);
}
finally {
- onLeave(gate, prev);
+ onLeave(opGate);
}
}
/** {@inheritDoc} */
@Override public boolean replace(K key, V oldVal, V newVal) throws TransactionException {
- GridCacheGateway<K, V> gate = gate();
-
- CacheOperationContext prev = onEnter(gate, opCtx);
+ CacheOperationGate opGate = onEnter();
try {
return delegate.replace(key, oldVal, newVal);
}
finally {
- onLeave(gate, prev);
+ onLeave(opGate);
}
}
/** {@inheritDoc} */
@Override public IgniteFuture<Boolean> replaceAsync(K key, V oldVal, V newVal) throws TransactionException {
- GridCacheGateway<K, V> gate = gate();
-
- CacheOperationContext prev = onEnter(gate, opCtx);
+ CacheOperationGate opGate = onEnter();
try {
return delegate.replaceAsync(key, oldVal, newVal);
}
finally {
- onLeave(gate, prev);
+ onLeave(opGate);
}
}
/** {@inheritDoc} */
@Override public boolean replace(K key, V val) throws TransactionException {
- GridCacheGateway<K, V> gate = gate();
-
- CacheOperationContext prev = onEnter(gate, opCtx);
+ CacheOperationGate opGate = onEnter();
try {
return delegate.replace(key, val);
}
finally {
- onLeave(gate, prev);
+ onLeave(opGate);
}
}
/** {@inheritDoc} */
@Override public IgniteFuture<Boolean> replaceAsync(K key, V val) throws TransactionException {
- GridCacheGateway<K, V> gate = gate();
-
- CacheOperationContext prev = onEnter(gate, opCtx);
+ CacheOperationGate opGate = onEnter();
try {
return delegate.replaceAsync(key, val);
}
finally {
- onLeave(gate, prev);
+ onLeave(opGate);
}
}
/** {@inheritDoc} */
@Override public V getAndReplace(K key, V val) throws TransactionException {
- GridCacheGateway<K, V> gate = gate();
-
- CacheOperationContext prev = onEnter(gate, opCtx);
+ CacheOperationGate opGate = onEnter();
try {
return delegate.getAndReplace(key, val);
}
finally {
- onLeave(gate, prev);
+ onLeave(opGate);
}
}
/** {@inheritDoc} */
@Override public IgniteFuture<V> getAndReplaceAsync(K key, V val) {
- GridCacheGateway<K, V> gate = gate();
-
- CacheOperationContext prev = onEnter(gate, opCtx);
+ CacheOperationGate opGate = onEnter();
try {
return delegate.getAndReplaceAsync(key, val);
}
finally {
- onLeave(gate, prev);
+ onLeave(opGate);
}
}
/** {@inheritDoc} */
@Override public void removeAll(Set<? extends K> keys) throws TransactionException {
- GridCacheGateway<K, V> gate = gate();
-
- CacheOperationContext prev = onEnter(gate, opCtx);
+ CacheOperationGate opGate = onEnter();
try {
delegate.removeAll(keys);
}
finally {
- onLeave(gate, prev);
+ onLeave(opGate);
}
}
/** {@inheritDoc} */
@Override public IgniteFuture<Void> removeAllAsync(Set<? extends K> keys) throws TransactionException {
- GridCacheGateway<K, V> gate = gate();
-
- CacheOperationContext prev = onEnter(gate, opCtx);
+ CacheOperationGate opGate = onEnter();
try {
return delegate.removeAllAsync(keys);
}
finally {
- onLeave(gate, prev);
+ onLeave(opGate);
}
}
/** {@inheritDoc} */
@Override public void removeAll() {
- GridCacheGateway<K, V> gate = gate();
-
- CacheOperationContext prev = onEnter(gate, opCtx);
+ CacheOperationGate opGate = onEnter();
try {
delegate.removeAll();
}
finally {
- onLeave(gate, prev);
+ onLeave(opGate);
}
}
/** {@inheritDoc} */
@Override public IgniteFuture<Void> removeAllAsync() {
- GridCacheGateway<K, V> gate = gate();
-
- CacheOperationContext prev = onEnter(gate, opCtx);
+ CacheOperationGate opGate = onEnter();
try {
return delegate.removeAllAsync();
}
finally {
- onLeave(gate, prev);
+ onLeave(opGate);
}
}
/** {@inheritDoc} */
@Override public void clear() {
- GridCacheGateway<K, V> gate = gate();
-
- CacheOperationContext prev = onEnter(gate, opCtx);
+ CacheOperationGate opGate = onEnter();
try {
delegate.clear();
}
finally {
- onLeave(gate, prev);
+ onLeave(opGate);
}
}
/** {@inheritDoc} */
@Override public IgniteFuture<Void> clearAsync() {
- GridCacheGateway<K, V> gate = gate();
-
- CacheOperationContext prev = onEnter(gate, opCtx);
+ CacheOperationGate opGate = onEnter();
try {
return delegate.clearAsync();
}
finally {
- onLeave(gate, prev);
+ onLeave(opGate);
}
}
/** {@inheritDoc} */
@Override public void clear(K key) {
- GridCacheGateway<K, V> gate = gate();
-
- CacheOperationContext prev = onEnter(gate, opCtx);
+ CacheOperationGate opGate = onEnter();
try {
delegate.clear(key);
}
finally {
- onLeave(gate, prev);
+ onLeave(opGate);
}
}
/** {@inheritDoc} */
@Override public IgniteFuture<Void> clearAsync(K key) {
- GridCacheGateway<K, V> gate = gate();
-
- CacheOperationContext prev = onEnter(gate, opCtx);
+ CacheOperationGate opGate = onEnter();
try {
return delegate.clearAsync(key);
}
finally {
- onLeave(gate, prev);
+ onLeave(opGate);
}
}
/** {@inheritDoc} */
@Override public void clearAll(Set<? extends K> keys) {
- GridCacheGateway<K, V> gate = gate();
-
- CacheOperationContext prev = onEnter(gate, opCtx);
+ CacheOperationGate opGate = onEnter();
try {
delegate.clearAll(keys);
}
finally {
- onLeave(gate, prev);
+ onLeave(opGate);
}
}
/** {@inheritDoc} */
@Override public IgniteFuture<Void> clearAllAsync(Set<? extends K> keys) {
- GridCacheGateway<K, V> gate = gate();
-
- CacheOperationContext prev = onEnter(gate, opCtx);
+ CacheOperationGate opGate = onEnter();
try {
return delegate.clearAllAsync(keys);
}
finally {
- onLeave(gate, prev);
+ onLeave(opGate);
}
}
/** {@inheritDoc} */
@Override public void localClear(K key) {
- GridCacheGateway<K, V> gate = gate();
-
- CacheOperationContext prev = onEnter(gate, opCtx);
+ CacheOperationGate opGate = onEnter();
try {
delegate.localClear(key);
}
finally {
- onLeave(gate, prev);
+ onLeave(opGate);
}
}
/** {@inheritDoc} */
@Override public void localClearAll(Set<? extends K> keys) {
- GridCacheGateway<K, V> gate = gate();
-
- CacheOperationContext prev = onEnter(gate, opCtx);
+ CacheOperationGate opGate = onEnter();
try {
delegate.localClearAll(keys);
}
finally {
- onLeave(gate, prev);
+ onLeave(opGate);
}
}
/** {@inheritDoc} */
@Override public <T> T invoke(K key, EntryProcessor<K, V, T> entryProcessor, Object... arguments) throws TransactionException {
- GridCacheGateway<K, V> gate = gate();
-
- CacheOperationContext prev = onEnter(gate, opCtx);
+ CacheOperationGate opGate = onEnter();
try {
return delegate.invoke(key, entryProcessor, arguments);
}
finally {
- onLeave(gate, prev);
+ onLeave(opGate);
}
}
/** {@inheritDoc} */
@Override public <T> IgniteFuture<T> invokeAsync(K key, EntryProcessor<K, V, T> entryProcessor, Object... arguments) throws TransactionException {
- GridCacheGateway<K, V> gate = gate();
-
- CacheOperationContext prev = onEnter(gate, opCtx);
+ CacheOperationGate opGate = onEnter();
try {
return delegate.invokeAsync(key, entryProcessor, arguments);
}
finally {
- onLeave(gate, prev);
+ onLeave(opGate);
}
}
/** {@inheritDoc} */
@Override public <T> T invoke(K key, CacheEntryProcessor<K, V, T> entryProcessor, Object... arguments) throws TransactionException {
- GridCacheGateway<K, V> gate = gate();
-
- CacheOperationContext prev = onEnter(gate, opCtx);
+ CacheOperationGate opGate = onEnter();
try {
return delegate.invoke(key, entryProcessor, arguments);
}
finally {
- onLeave(gate, prev);
+ onLeave(opGate);
}
}
/** {@inheritDoc} */
@Override public <T> IgniteFuture<T> invokeAsync(K key, CacheEntryProcessor<K, V, T> entryProcessor, Object... arguments) throws TransactionException {
- GridCacheGateway<K, V> gate = gate();
-
- CacheOperationContext prev = onEnter(gate, opCtx);
+ CacheOperationGate opGate = onEnter();
try {
return delegate.invokeAsync(key, entryProcessor, arguments);
}
finally {
- onLeave(gate, prev);
+ onLeave(opGate);
}
}
/** {@inheritDoc} */
@Override public <T> Map<K, EntryProcessorResult<T>> invokeAll(Set<? extends K> keys, EntryProcessor<K, V, T> entryProcessor, Object... args) throws TransactionException {
- GridCacheGateway<K, V> gate = gate();
-
- CacheOperationContext prev = onEnter(gate, opCtx);
+ CacheOperationGate opGate = onEnter();
try {
return delegate.invokeAll(keys, entryProcessor, args);
}
finally {
- onLeave(gate, prev);
+ onLeave(opGate);
}
}
/** {@inheritDoc} */
@Override public <T> IgniteFuture<Map<K, EntryProcessorResult<T>>> invokeAllAsync(Set<? extends K> keys, EntryProcessor<K, V, T> entryProcessor, Object... args) throws TransactionException {
- GridCacheGateway<K, V> gate = gate();
-
- CacheOperationContext prev = onEnter(gate, opCtx);
+ CacheOperationGate opGate = onEnter();
try {
return delegate.invokeAllAsync(keys, entryProcessor, args);
}
finally {
- onLeave(gate, prev);
+ onLeave(opGate);
}
}
/** {@inheritDoc} */
@Override public <T> Map<K, EntryProcessorResult<T>> invokeAll(Set<? extends K> keys, CacheEntryProcessor<K, V, T> entryProcessor, Object... args) throws TransactionException {
- GridCacheGateway<K, V> gate = gate();
-
- CacheOperationContext prev = onEnter(gate, opCtx);
+ CacheOperationGate opGate = onEnter();
try {
return delegate.invokeAll(keys, entryProcessor, args);
}
finally {
- onLeave(gate, prev);
+ onLeave(opGate);
}
}
/** {@inheritDoc} */
@Override public <T> IgniteFuture<Map<K, EntryProcessorResult<T>>> invokeAllAsync(Set<? extends K> keys, CacheEntryProcessor<K, V, T> entryProcessor, Object... args) throws TransactionException {
- GridCacheGateway<K, V> gate = gate();
-
- CacheOperationContext prev = onEnter(gate, opCtx);
+ CacheOperationGate opGate = onEnter();
try {
return delegate.invokeAllAsync(keys, entryProcessor, args);
}
finally {
- onLeave(gate, prev);
+ onLeave(opGate);
}
}
@@ -1443,43 +1267,37 @@ public class GatewayProtectedCacheProxy<K, V> extends AsyncSupportAdapter<Ignite
/** {@inheritDoc} */
@Override public void registerCacheEntryListener(CacheEntryListenerConfiguration<K, V> cacheEntryListenerConfiguration) {
- GridCacheGateway<K, V> gate = gate();
-
- CacheOperationContext prev = onEnter(gate, opCtx);
+ CacheOperationGate opGate = onEnter();
try {
delegate.registerCacheEntryListener(cacheEntryListenerConfiguration);
}
finally {
- onLeave(gate, prev);
+ onLeave(opGate);
}
}
/** {@inheritDoc} */
@Override public void deregisterCacheEntryListener(CacheEntryListenerConfiguration<K, V> cacheEntryListenerConfiguration) {
- GridCacheGateway<K, V> gate = gate();
-
- CacheOperationContext prev = onEnter(gate, opCtx);
+ CacheOperationGate opGate = onEnter();
try {
delegate.deregisterCacheEntryListener(cacheEntryListenerConfiguration);
}
finally {
- onLeave(gate, prev);
+ onLeave(opGate);
}
}
/** {@inheritDoc} */
@Override public Iterator<Entry<K, V>> iterator() {
- GridCacheGateway<K, V> gate = gate();
-
- CacheOperationContext prev = onEnter(gate, opCtx);
+ CacheOperationGate opGate = onEnter();
try {
return delegate.iterator();
}
finally {
- onLeave(gate, prev);
+ onLeave(opGate);
}
}
@@ -1550,99 +1368,85 @@ public class GatewayProtectedCacheProxy<K, V> extends AsyncSupportAdapter<Ignite
/** {@inheritDoc} */
@Override public CacheMetrics metrics() {
- GridCacheGateway<K, V> gate = gate();
-
- CacheOperationContext prev = onEnter(gate, opCtx);
+ CacheOperationGate opGate = onEnter();
try {
return delegate.metrics();
}
finally {
- onLeave(gate, prev);
+ onLeave(opGate);
}
}
/** {@inheritDoc} */
@Override public CacheMetrics metrics(ClusterGroup grp) {
- GridCacheGateway<K, V> gate = gate();
-
- CacheOperationContext prev = onEnter(gate, opCtx);
+ CacheOperationGate opGate = onEnter();
try {
return delegate.metrics(grp);
}
finally {
- onLeave(gate, prev);
+ onLeave(opGate);
}
}
/** {@inheritDoc} */
@Override public CacheMetrics localMetrics() {
- GridCacheGateway<K, V> gate = gate();
-
- CacheOperationContext prev = onEnter(gate, opCtx);
+ CacheOperationGate opGate = onEnter();
try {
return delegate.localMetrics();
}
finally {
- onLeave(gate, prev);
+ onLeave(opGate);
}
}
/** {@inheritDoc} */
@Override public CacheMetricsMXBean mxBean() {
- GridCacheGateway<K, V> gate = gate();
-
- CacheOperationContext prev = onEnter(gate, opCtx);
+ CacheOperationGate opGate = onEnter();
try {
return delegate.mxBean();
}
finally {
- onLeave(gate, prev);
+ onLeave(opGate);
}
}
/** {@inheritDoc} */
@Override public CacheMetricsMXBean localMxBean() {
- GridCacheGateway<K, V> gate = gate();
-
- CacheOperationContext prev = onEnter(gate, opCtx);
+ CacheOperationGate opGate = onEnter();
try {
return delegate.localMxBean();
}
finally {
- onLeave(gate, prev);
+ onLeave(opGate);
}
}
/** {@inheritDoc} */
@Override public Collection<Integer> lostPartitions() {
- GridCacheGateway<K, V> gate = gate();
-
- CacheOperationContext prev = onEnter(gate, opCtx);
+ CacheOperationGate opGate = onEnter();
try {
return delegate.lostPartitions();
}
finally {
- onLeave(gate, prev);
+ onLeave(opGate);
}
}
/** {@inheritDoc} */
@Override public void enableStatistics(boolean enabled) {
- GridCacheGateway<K, V> gate = gate();
-
- CacheOperationContext prev = onEnter(gate, opCtx);
+ CacheOperationGate opGate = onEnter();
try {
delegate.enableStatistics(enabled);
}
finally {
- onLeave(gate, prev);
+ onLeave(opGate);
}
}
@@ -1662,26 +1466,49 @@ public class GatewayProtectedCacheProxy<K, V> extends AsyncSupportAdapter<Ignite
*
* @param gate Cache gateway.
*/
- private void checkProxyIsValid(@Nullable GridCacheGateway<K, V> gate) {
+ private GridCacheGateway<K, V> checkProxyIsValid(@Nullable GridCacheGateway<K, V> gate, boolean tryRestart) {
if (isProxyClosed())
throw new IllegalStateException("Cache has been closed: " + context().name());
- if (delegate instanceof IgniteCacheProxyImpl)
+ boolean isCacheProxy = delegate instanceof IgniteCacheProxyImpl;
+
+ if (isCacheProxy)
((IgniteCacheProxyImpl) delegate).checkRestart();
if (gate == null)
throw new IllegalStateException("Gateway is unavailable. Probably cache has been destroyed, but proxy is not closed.");
+
+ if (isCacheProxy && tryRestart && gate.isStopped() &&
+ context().kernalContext().gateway().getState() == GridKernalState.STARTED) {
+ IgniteCacheProxyImpl proxyImpl = (IgniteCacheProxyImpl) delegate;
+
+ try {
+ IgniteInternalCache<K, V> cache = context().kernalContext().cache().<K, V>publicJCache(context().name()).internalProxy();
+
+ GridFutureAdapter<Void> fut = proxyImpl.opportunisticRestart();
+
+ if (fut == null)
+ proxyImpl.onRestarted(cache.context(), cache.context().cache());
+ else
+ new IgniteFutureImpl<>(fut).get();
+
+ return gate();
+ } catch (IgniteCheckedException ice) {
+ // Opportunity didn't work out.
+ }
+ }
+
+ return gate;
}
/**
- * @param gate Cache gateway.
- * @param opCtx Cache operation context to guard.
* @return Previous projection set on this thread.
*/
- private CacheOperationContext onEnter(@Nullable GridCacheGateway<K, V> gate, CacheOperationContext opCtx) {
- checkProxyIsValid(gate);
+ private CacheOperationGate onEnter() {
+ GridCacheGateway<K, V> gate = checkProxyIsValid(gate(), true);
- return lock ? gate.enter(opCtx) : gate.enterNoLock(opCtx);
+ return new CacheOperationGate(gate,
+ lock ? gate.enter(opCtx) : gate.enterNoLock(opCtx));
}
/**
@@ -1690,7 +1517,7 @@ public class GatewayProtectedCacheProxy<K, V> extends AsyncSupportAdapter<Ignite
*/
private boolean onEnterIfNoStop(@Nullable GridCacheGateway<K, V> gate) {
try {
- checkProxyIsValid(gate);
+ checkProxyIsValid(gate, false);
}
catch (Exception e) {
return false;
@@ -1700,14 +1527,13 @@ public class GatewayProtectedCacheProxy<K, V> extends AsyncSupportAdapter<Ignite
}
/**
- * @param gate Cache gateway.
- * @param opCtx Operation context to guard.
+ * @param opGate Operation context to guard.
*/
- private void onLeave(GridCacheGateway<K, V> gate, CacheOperationContext opCtx) {
+ private void onLeave(CacheOperationGate opGate) {
if (lock)
- gate.leave(opCtx);
+ opGate.gate.leave(opGate.prev);
else
- gate.leaveNoLock(opCtx);
+ opGate.gate.leaveNoLock(opGate.prev);
}
/**
@@ -1774,4 +1600,28 @@ public class GatewayProtectedCacheProxy<K, V> extends AsyncSupportAdapter<Ignite
@Override public int hashCode() {
return delegate.hashCode();
}
+
+ /**
+ * Holder for gate being entered and operation context to restore.
+ */
+ private class CacheOperationGate {
+ /**
+ * Gate being entered in this operation.
+ */
+ public final GridCacheGateway<K, V> gate;
+
+ /**
+ * Operation context to restore after current operation completes.
+ */
+ public final CacheOperationContext prev;
+
+ /**
+ * @param gate Gate being entered in this operation.
+ * @param prev Operation context to restore after current operation completes.
+ */
+ public CacheOperationGate(GridCacheGateway<K, V> gate, CacheOperationContext prev) {
+ this.gate = gate;
+ this.prev = prev;
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/0991437a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheGateway.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheGateway.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheGateway.java
index b9a4b25..658ca2a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheGateway.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheGateway.java
@@ -256,6 +256,13 @@ public class GridCacheGateway<K, V> {
/**
*
*/
+ public boolean isStopped() {
+ return !checkState(false, false);
+ }
+
+ /**
+ *
+ */
public void stopped() {
state.set(State.STOPPED);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/0991437a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxyImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxyImpl.java
index be4b0db..68e5b85 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxyImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxyImpl.java
@@ -1824,8 +1824,10 @@ public class IgniteCacheProxyImpl<K, V> extends AsyncSupportAdapter<IgniteCache<
* Throws {@code IgniteCacheRestartingException} if proxy is restarting.
*/
public void checkRestart() {
- if (isRestarting())
- throw new IgniteCacheRestartingException(new IgniteFutureImpl<>(restartFut.get()), "Cache is restarting: " +
+ GridFutureAdapter<Void> currentFut = this.restartFut.get();
+
+ if (currentFut != null)
+ throw new IgniteCacheRestartingException(new IgniteFutureImpl<>(currentFut), "Cache is restarting: " +
context().name());
}
@@ -1833,13 +1835,13 @@ public class IgniteCacheProxyImpl<K, V> extends AsyncSupportAdapter<IgniteCache<
* @return True if proxy is restarting, false in other case.
*/
public boolean isRestarting() {
- return restartFut != null && restartFut.get() != null;
+ return restartFut.get() != null;
}
/**
* Restarts this cache proxy.
*/
- public void restart() {
+ public boolean restart() {
GridFutureAdapter<Void> restartFut = new GridFutureAdapter<>();
final GridFutureAdapter<Void> curFut = this.restartFut.get();
@@ -1855,6 +1857,27 @@ public class IgniteCacheProxyImpl<K, V> extends AsyncSupportAdapter<IgniteCache<
curFut.onDone();
}
});
+
+ return changed;
+ }
+
+ /**
+ * If proxy is already being restarted, returns future to wait on, else restarts this cache proxy.
+ *
+ * @return Future to wait on, or null.
+ */
+ public GridFutureAdapter<Void> opportunisticRestart() {
+ GridFutureAdapter<Void> restartFut = new GridFutureAdapter<>();
+
+ while (true) {
+ if (this.restartFut.compareAndSet(null, restartFut))
+ return null;
+
+ GridFutureAdapter<Void> curFut = this.restartFut.get();
+
+ if (curFut != null)
+ return curFut;
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/0991437a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
index 16c5d3a..b22a397 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
@@ -1616,8 +1616,6 @@ class ClientImpl extends TcpDiscoveryImpl {
onDisconnected();
- notifyDiscovery(EVT_CLIENT_NODE_DISCONNECTED, topVer, locNode, allVisibleNodes());
-
UUID newId = UUID.randomUUID();
U.quietAndWarn(log, "Local node will try to reconnect to cluster with new id due " +
@@ -1716,8 +1714,6 @@ class ClientImpl extends TcpDiscoveryImpl {
}
onDisconnected();
-
- notifyDiscovery(EVT_CLIENT_NODE_DISCONNECTED, topVer, locNode, allVisibleNodes());
}
UUID newId = UUID.randomUUID();
@@ -1820,6 +1816,8 @@ class ClientImpl extends TcpDiscoveryImpl {
delayDiscoData.clear();
+ notifyDiscovery(EVT_CLIENT_NODE_DISCONNECTED, topVer, locNode, allVisibleNodes());
+
IgniteClientDisconnectedCheckedException err =
new IgniteClientDisconnectedCheckedException(null, "Failed to ping node, " +
"client node disconnected.");
http://git-wip-us.apache.org/repos/asf/ignite/blob/0991437a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/IgniteCacheQueryCacheDestroySelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/IgniteCacheQueryCacheDestroySelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/IgniteCacheQueryCacheDestroySelfTest.java
index dea491c..d0d392b 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/IgniteCacheQueryCacheDestroySelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/IgniteCacheQueryCacheDestroySelfTest.java
@@ -48,6 +48,10 @@ public class IgniteCacheQueryCacheDestroySelfTest extends GridCommonAbstractTest
/** */
public static final int GRID_CNT = 3;
+ @Override protected void afterTest() throws Exception {
+ stopAllGrids();
+ }
+
/**
* The main test code.
*/
http://git-wip-us.apache.org/repos/asf/ignite/blob/0991437a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/ClientReconnectAfterClusterRestartTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/ClientReconnectAfterClusterRestartTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/ClientReconnectAfterClusterRestartTest.java
index 392cdc7..505d373 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/ClientReconnectAfterClusterRestartTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/ClientReconnectAfterClusterRestartTest.java
@@ -17,8 +17,10 @@
package org.apache.ignite.internal.processors.cache;
+import javax.cache.CacheException;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteClientDisconnectedException;
import org.apache.ignite.IgniteDataStreamer;
import org.apache.ignite.binary.BinaryObject;
import org.apache.ignite.binary.BinaryObjectBuilder;
@@ -31,9 +33,8 @@ import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.events.Event;
import org.apache.ignite.events.EventType;
import org.apache.ignite.internal.binary.BinaryMarshaller;
-import org.apache.ignite.internal.util.lang.GridAbsPredicate;
+import org.apache.ignite.lang.IgniteFuture;
import org.apache.ignite.lang.IgnitePredicate;
-import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.jetbrains.annotations.NotNull;
@@ -119,6 +120,8 @@ public class ClientReconnectAfterClusterRestartTest extends GridCommonAbstractTe
checkTopology(2);
+ IgniteCache<Long, BinaryObject> cache = client.getOrCreateCache(CACHE_PARAMS).withKeepBinary();
+
client.events().localListen(new IgnitePredicate<Event>() {
@Override public boolean apply(Event event) {
@@ -161,27 +164,17 @@ public class ClientReconnectAfterClusterRestartTest extends GridCommonAbstractTe
startGrid(0);
- assert GridTestUtils.waitForCondition(new GridAbsPredicate() {
- @Override public boolean apply() {
- try {
- checkTopology(2);
-
- return true;
- } catch (Exception ex) {
- return false;
- }
- }
- }, 30_000);
+ try {
+ assertNull(cache.get(1L));
+ } catch (CacheException ce) {
+ IgniteClientDisconnectedException icde = (IgniteClientDisconnectedException)ce.getCause();
- info("Pre-insert");
+ icde.reconnectFuture().get();
- streamer = client.dataStreamer("PPRB_PARAMS");
- streamer.allowOverwrite(true);
- streamer.keepBinary(true);
- streamer.perNodeBufferSize(10000);
- streamer.perNodeParallelOperations(100);
+ assertNull(cache.get(1L));
+ }
- IgniteCache<Long, BinaryObject> cache = client.getOrCreateCache(CACHE_PARAMS).withKeepBinary();
+ info("Pre-insert");
builder = client.binary().builder("PARAMS");
builder.setField("ID", 2L);