You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by se...@apache.org on 2017/02/05 19:45:04 UTC
[2/9] ignite git commit: ignite-db-x fix
IgniteCachePartitionLossPolicySelfTest, batch reset partition
ignite-db-x fix IgniteCachePartitionLossPolicySelfTest, batch reset partition
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/a70dc00e
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/a70dc00e
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/a70dc00e
Branch: refs/heads/ignite-4652
Commit: a70dc00e1b699a9ad2959dd127241ec4371c0a1c
Parents: 16f64d1
Author: Dmitriy Govorukhin <dg...@gridgain.com>
Authored: Thu Feb 2 18:26:45 2017 +0300
Committer: Dmitriy Govorukhin <dg...@gridgain.com>
Committed: Thu Feb 2 18:26:45 2017 +0300
----------------------------------------------------------------------
.../src/main/java/org/apache/ignite/Ignite.java | 5 +++
.../java/org/apache/ignite/IgniteCache.java | 6 ---
.../apache/ignite/internal/IgniteKernal.java | 15 +++++++
.../processors/cache/GridCacheProcessor.java | 44 +++++++++++++++-----
.../processors/cache/IgniteCacheProxy.java | 26 ------------
.../GridDhtPartitionsExchangeFuture.java | 41 +++++++++++-------
.../IgniteCachePartitionLossPolicySelfTest.java | 14 +++----
.../processors/igfs/IgfsIgniteMock.java | 5 +++
.../ignite/testframework/junits/IgniteMock.java | 5 +++
.../multijvm/IgniteCacheProcessProxy.java | 5 ---
.../junits/multijvm/IgniteProcessProxy.java | 5 +++
11 files changed, 102 insertions(+), 69 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/a70dc00e/modules/core/src/main/java/org/apache/ignite/Ignite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/Ignite.java b/modules/core/src/main/java/org/apache/ignite/Ignite.java
index 54512a3..408a797 100644
--- a/modules/core/src/main/java/org/apache/ignite/Ignite.java
+++ b/modules/core/src/main/java/org/apache/ignite/Ignite.java
@@ -612,4 +612,9 @@ public interface Ignite extends AutoCloseable {
* @param active If {@code True} start activation process. If {@code False} start deactivation process.
*/
public void active(boolean active);
+
+ /**
+ * Clears partition's lost state and moves caches to a normal mode.
+ */
+ public void resetLostPartitions(Collection<String> cacheNames);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/a70dc00e/modules/core/src/main/java/org/apache/ignite/IgniteCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteCache.java b/modules/core/src/main/java/org/apache/ignite/IgniteCache.java
index eca61ab..412e08d 100644
--- a/modules/core/src/main/java/org/apache/ignite/IgniteCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/IgniteCache.java
@@ -933,10 +933,4 @@ public interface IgniteCache<K, V> extends javax.cache.Cache<K, V>, IgniteAsyncS
* @return Lost paritions.
*/
public Collection<Integer> lostPartitions();
-
- /**
- * Clears partition's lost state and moves cache to a normal mode.
- */
- @IgniteAsyncSupported
- public void resetLostPartitions();
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/a70dc00e/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
index c11b770..ad10f7e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
@@ -3226,6 +3226,21 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
}
/** {@inheritDoc} */
+ @Override public void resetLostPartitions(Collection<String> cacheNames) {
+ guard();
+
+ try {
+ ctx.cache().resetCacheState(cacheNames).get();
+ }
+ catch (IgniteCheckedException e) {
+ throw U.convertException(e);
+ }
+ finally {
+ unguard();
+ }
+ }
+
+ /** {@inheritDoc} */
@Nullable @Override public IgniteAtomicSequence atomicSequence(String name, long initVal, boolean create) {
guard();
http://git-wip-us.apache.org/repos/asf/ignite/blob/a70dc00e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index 57b0d84..542a18c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -2570,22 +2570,43 @@ public class GridCacheProcessor extends GridProcessorAdapter {
/**
* Resets cache state after the cache has been moved to recovery state.
*
- * @param cacheName Cache name.
- * @return Future that will be completed when state is changed.
+ * @param cacheNames Cache names.
+ * @return Future that will be completed when state is changed for all caches.
*/
- public IgniteInternalFuture<?> resetCacheState(String cacheName) {
- IgniteCacheProxy<?, ?> proxy = jCacheProxies.get(maskNull(cacheName));
+ public IgniteInternalFuture<?> resetCacheState(Collection<String> cacheNames) {
+ checkEmptyTransactions();
- if (proxy == null || proxy.proxyClosed())
- return new GridFinishedFuture<>(); // No-op.
+ if (F.isEmpty(cacheNames))
+ cacheNames = registeredCaches.keySet();
- checkEmptyTransactions();
+ Collection<DynamicCacheChangeRequest> reqs = new ArrayList<>(cacheNames.size());
- DynamicCacheChangeRequest t = new DynamicCacheChangeRequest(UUID.randomUUID(), cacheName, ctx.localNodeId());
+ for (String cacheName : cacheNames) {
+ DynamicCacheDescriptor desc = registeredCaches.get(cacheName);
- t.markResetLostPartitions();
+ if (desc == null) {
+ log.warning("Reset lost partition will not be executed, " +
+ "because cache with name:" + cacheName + " doesn't not exist");
- return F.first(initiateCacheChanges(F.asList(t), false));
+ continue;
+ }
+
+ DynamicCacheChangeRequest req = new DynamicCacheChangeRequest(
+ UUID.randomUUID(), cacheName, ctx.localNodeId());
+
+ req.markResetLostPartitions();
+
+ reqs.add(req);
+ }
+
+ GridCompoundFuture fut = new GridCompoundFuture();
+
+ for (DynamicCacheStartFuture f : initiateCacheChanges(reqs, false))
+ fut.add(f);
+
+ fut.markInitialized();
+
+ return fut;
}
/**
@@ -2690,7 +2711,8 @@ public class GridCacheProcessor extends GridProcessorAdapter {
*/
@SuppressWarnings("TypeMayBeWeakened")
private Collection<DynamicCacheStartFuture> initiateCacheChanges(
- Collection<DynamicCacheChangeRequest> reqs, boolean failIfExists
+ Collection<DynamicCacheChangeRequest> reqs,
+ boolean failIfExists
) {
Collection<DynamicCacheStartFuture> res = new ArrayList<>(reqs.size());
http://git-wip-us.apache.org/repos/asf/ignite/blob/a70dc00e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
index dd92309..03cbbf7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
@@ -2307,32 +2307,6 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
}
/** {@inheritDoc} */
- @Override public void resetLostPartitions() {
- GridCacheGateway<K, V> gate = this.gate;
-
- CacheOperationContext prev = onEnter(gate, opCtx);
-
- try {
- IgniteInternalFuture<?> fut = ctx.kernalContext().cache().resetCacheState(getName());
-
- if (isAsync())
- setFuture(fut);
- else {
- try {
- fut.get();
- }
- catch (IgniteCheckedException e) {
- throw CU.convertToCacheException(e);
- }
- }
- }
- finally {
- onLeave(gate, prev);
- }
-
- }
-
- /** {@inheritDoc} */
@Override public void writeExternal(ObjectOutput out) throws IOException {
out.writeObject(ctx);
http://git-wip-us.apache.org/repos/asf/ignite/blob/a70dc00e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index 75fd3e2..af0085d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -1041,6 +1041,21 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
* @throws IgniteCheckedException If failed.
*/
private void sendLocalPartitions(ClusterNode node) throws IgniteCheckedException {
+ assert node != null;
+
+ // Reset lost partition before send local partition to coordinator.
+ if (!F.isEmpty(reqs)) {
+ Set<String> caches = new HashSet<>();
+
+ for (DynamicCacheChangeRequest req : reqs) {
+ if (req.resetLostPartitions())
+ caches.add(req.cacheName());
+ }
+
+ if (!F.isEmpty(caches))
+ resetLostPartitions(caches);
+ }
+
GridDhtPartitionsSingleMessage m = cctx.exchange().createPartitionsSingleMessage(
node, exchangeId(), clientOnlyExchange, true);
@@ -1141,17 +1156,10 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
}
}
- if (!F.isEmpty(reqs)) {
- for (DynamicCacheChangeRequest req : reqs) {
- if (req.resetLostPartitions()) {
- resetLostPartitions();
-
- break;
- }
- }
- }
-
- detectLostPartitions();
+ if (discoEvt.type() == EVT_NODE_LEFT ||
+ discoEvt.type() == EVT_NODE_FAILED ||
+ discoEvt.type() == EVT_NODE_JOINED)
+ detectLostPartitions();
Map<Integer, CacheValidation> m = new HashMap<>(cctx.cacheContexts().size());
@@ -1514,13 +1522,13 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
/**
*
*/
- private void resetLostPartitions() {
+ private void resetLostPartitions(Collection<String> cacheNames) {
synchronized (cctx.exchange().interruptLock()) {
if (Thread.currentThread().isInterrupted())
return;
for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
- if (!cacheCtx.isLocal())
+ if (!cacheCtx.isLocal() && cacheNames.contains(cacheCtx.name()))
cacheCtx.topology().resetLostPartitions();
}
}
@@ -1551,12 +1559,17 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
DynamicCacheChangeBatch batch = (DynamicCacheChangeBatch)((DiscoveryCustomEvent)discoEvt)
.customMessage();
+ Set<String> caches = new HashSet<>();
+
for (DynamicCacheChangeRequest req : batch.requests()) {
if (req.resetLostPartitions())
- resetLostPartitions();
+ caches.add(req.cacheName());
else if (req.globalStateChange() && req.state() != ClusterState.INACTIVE)
assignPartitionsStates();
}
+
+ if (!F.isEmpty(caches))
+ resetLostPartitions(caches);
}
}
else if (discoEvt.type() == EVT_NODE_LEFT || discoEvt.type() == EVT_NODE_FAILED)
http://git-wip-us.apache.org/repos/asf/ignite/blob/a70dc00e/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCachePartitionLossPolicySelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCachePartitionLossPolicySelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCachePartitionLossPolicySelfTest.java
index 5951ba3..480dc20 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCachePartitionLossPolicySelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCachePartitionLossPolicySelfTest.java
@@ -17,6 +17,12 @@
package org.apache.ignite.internal.processors.cache.distributed;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import javax.cache.CacheException;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.cache.PartitionLossPolicy;
@@ -38,12 +44,6 @@ import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.apache.ignite.util.TestTcpCommunicationSpi;
-import javax.cache.CacheException;
-import java.util.Collection;
-import java.util.UUID;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-
import static org.apache.ignite.cache.CacheMode.PARTITIONED;
import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
@@ -200,7 +200,7 @@ public class IgniteCachePartitionLossPolicySelfTest extends GridCommonAbstractTe
for (Ignite ig : G.allGrids())
verifyCacheOps(canWrite, safe, part, ig);
- ignite(0).cache(CACHE_NAME).resetLostPartitions();
+ ignite(0).resetLostPartitions(Collections.singletonList(CACHE_NAME));
awaitPartitionMapExchange(true, true, null);
http://git-wip-us.apache.org/repos/asf/ignite/blob/a70dc00e/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsIgniteMock.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsIgniteMock.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsIgniteMock.java
index 027072b..2a522fa 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsIgniteMock.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsIgniteMock.java
@@ -523,6 +523,11 @@ public class IgfsIgniteMock implements IgniteEx {
throwUnsupported();
}
+ /** {@inheritDoc} */
+ @Override public void resetLostPartitions(Collection<String> cacheNames) {
+ throwUnsupported();
+ }
+
/**
* Throw {@link UnsupportedOperationException}.
*/
http://git-wip-us.apache.org/repos/asf/ignite/blob/a70dc00e/modules/core/src/test/java/org/apache/ignite/testframework/junits/IgniteMock.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/IgniteMock.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/IgniteMock.java
index 772abcf..bf3f17e 100644
--- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/IgniteMock.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/IgniteMock.java
@@ -431,6 +431,11 @@ public class IgniteMock implements Ignite {
// No-op.
}
+ /** {@inheritDoc} */
+ @Override public void resetLostPartitions(Collection<String> cacheNames) {
+ // No-op.
+ }
+
/**
* @param staticCfg Configuration.
*/
http://git-wip-us.apache.org/repos/asf/ignite/blob/a70dc00e/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteCacheProcessProxy.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteCacheProcessProxy.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteCacheProcessProxy.java
index 1b1d14e..df60bf1 100644
--- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteCacheProcessProxy.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteCacheProcessProxy.java
@@ -509,11 +509,6 @@ public class IgniteCacheProcessProxy<K, V> implements IgniteCache<K, V> {
throw new UnsupportedOperationException("Method should be supported.");
}
- /** {@inheritDoc} */
- @Override public void resetLostPartitions() {
- throw new UnsupportedOperationException("Method should be supported.");
- }
-
/**
*
*/
http://git-wip-us.apache.org/repos/asf/ignite/blob/a70dc00e/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteProcessProxy.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteProcessProxy.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteProcessProxy.java
index 15a520a..05a72e3 100644
--- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteProcessProxy.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteProcessProxy.java
@@ -624,6 +624,11 @@ public class IgniteProcessProxy implements IgniteEx {
}
/** {@inheritDoc} */
+ @Override public void resetLostPartitions(Collection<String> cacheNames) {
+ throw new UnsupportedOperationException("Operation isn't supported yet.");
+ }
+
+ /** {@inheritDoc} */
@Override public void close() throws IgniteException {
final CountDownLatch rmtNodeStoppedLatch = new CountDownLatch(1);