You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ag...@apache.org on 2017/04/12 07:34:10 UTC
[39/57] [abbrv] ignite git commit: Merge branch master into
ignite-3477-master
Merge branch master into ignite-3477-master
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/3eb05de5
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/3eb05de5
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/3eb05de5
Branch: refs/heads/ignite-3477-debug
Commit: 3eb05de5e2ee0d14567167bfe8547441cae69523
Parents: 8122099 aeacad6
Author: Alexey Goncharuk <al...@gmail.com>
Authored: Tue Apr 11 13:16:03 2017 +0300
Committer: Alexey Goncharuk <al...@gmail.com>
Committed: Tue Apr 11 13:16:03 2017 +0300
----------------------------------------------------------------------
.../communication/GridIoMessageFactory.java | 8 +-
.../GridCachePartitionExchangeManager.java | 6 +-
.../processors/cache/GridCachePreloader.java | 4 +-
.../cache/GridCachePreloaderAdapter.java | 4 +-
.../dht/preloader/GridDhtPartitionDemander.java | 2 +-
.../dht/preloader/GridDhtPartitionSupplier.java | 8 +-
.../GridDhtPartitionSupplyMessage.java | 103 +++--
.../GridDhtPartitionSupplyMessageV2.java | 422 -------------------
.../dht/preloader/GridDhtPreloader.java | 2 +-
.../resources/META-INF/classnames.properties | 1 -
.../cache/ClusterStateAbstractTest.java | 10 +-
.../CacheLateAffinityAssignmentTest.java | 6 +-
.../IgniteCacheReadFromBackupTest.java | 6 +-
.../atomic/IgniteCacheAtomicProtocolTest.java | 3 +-
14 files changed, 84 insertions(+), 501 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/3eb05de5/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/3eb05de5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/3eb05de5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/3eb05de5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/3eb05de5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/3eb05de5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
index b80ad04,7c2599a..f7f0aff
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
@@@ -332,10 -537,8 +334,10 @@@ class GridDhtPartitionSupplier
if (!reply(node, d, s, scId))
return;
- s = new GridDhtPartitionSupplyMessageV2(d.updateSequence(),
+ s = new GridDhtPartitionSupplyMessage(d.updateSequence(),
- cctx.cacheId(), d.topologyVersion(), cctx.deploymentEnabled());
+ cctx.cacheId(),
+ d.topologyVersion(),
+ cctx.deploymentEnabled());
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/3eb05de5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessage.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessage.java
index cc30321,a01be28..ee461ab
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessage.java
@@@ -66,9 -64,9 +64,14 @@@ public class GridDhtPartitionSupplyMess
@GridDirectCollection(int.class)
private Collection<Integer> missed;
++ /** Partitions for which we were able to get historical iterator. */
++ @GridToStringInclude
++ @GridDirectCollection(int.class)
++ private Collection<Integer> clean;
++
/** Entries. */
@GridDirectMap(keyType = int.class, valueType = CacheEntryInfoCollection.class)
- private Map<Integer, CacheEntryInfoCollection> infos = new HashMap<>();
+ private Map<Integer, CacheEntryInfoCollection> infos;
/** Message size. */
@GridDirectTransient
@@@ -159,6 -142,6 +147,25 @@@
}
/**
++ * @param p Partition to clean.
++ */
++ void clean(int p) {
++ if (clean == null)
++ clean = new HashSet<>();
++
++ if (clean.add(p))
++ msgSize += 4;
++ }
++
++ /**
++ * @param p Partition to check.
++ * @return Check result.
++ */
++ boolean isClean(int p) {
++ return clean != null && clean.contains(p);
++ }
++
++ /**
* @param p Missed partition.
*/
void missed(int p) {
@@@ -288,7 -274,7 +298,7 @@@
switch (writer.state()) {
case 3:
- if (!writer.writeBoolean("ack", ack))
- if (!writer.writeMap("infos", infos, MessageCollectionItemType.INT, MessageCollectionItemType.MSG))
++ if (!writer.writeCollection("clean", clean, MessageCollectionItemType.INT))
return false;
writer.incrementState();
@@@ -312,13 -298,7 +322,13 @@@
writer.incrementState();
case 7:
- if (!writer.writeLong("updateSeq", updateSeq))
++ if (!writer.writeMessage("topVer", topVer))
+ return false;
+
+ writer.incrementState();
+
+ case 8:
- if (!writer.writeInt("workerId", workerId))
+ if (!writer.writeLong("updateSeq", updateSeq))
return false;
writer.incrementState();
@@@ -340,7 -320,7 +350,7 @@@
switch (reader.state()) {
case 3:
- ack = reader.readBoolean("ack");
- infos = reader.readMap("infos", MessageCollectionItemType.INT, MessageCollectionItemType.MSG, false);
++ clean = reader.readCollection("clean", MessageCollectionItemType.INT);
if (!reader.isLastRead())
return false;
@@@ -372,15 -352,7 +382,15 @@@
reader.incrementState();
case 7:
- updateSeq = reader.readLong("updateSeq");
++ topVer = reader.readMessage("topVer");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 8:
- workerId = reader.readInt("workerId");
+ updateSeq = reader.readLong("updateSeq");
if (!reader.isLastRead())
return false;
http://git-wip-us.apache.org/repos/asf/ignite/blob/3eb05de5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/3eb05de5/modules/core/src/main/resources/META-INF/classnames.properties
----------------------------------------------------------------------
diff --cc modules/core/src/main/resources/META-INF/classnames.properties
index 473f176,8c5a72e..335a33f
--- a/modules/core/src/main/resources/META-INF/classnames.properties
+++ b/modules/core/src/main/resources/META-INF/classnames.properties
@@@ -763,9 -740,9 +763,8 @@@ org.apache.ignite.internal.processors.c
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionExchangeId
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionFullMap
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionMap
-org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplier$1
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplier$SupplyContextPhase
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyMessage
- org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyMessageV2
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsAbstractMessage
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture$1
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture$2
http://git-wip-us.apache.org/repos/asf/ignite/blob/3eb05de5/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/ClusterStateAbstractTest.java
----------------------------------------------------------------------
diff --cc modules/core/src/test/java/org/apache/ignite/internal/processors/cache/ClusterStateAbstractTest.java
index f095e79,0000000..ce7829a
mode 100644,000000..100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/ClusterStateAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/ClusterStateAbstractTest.java
@@@ -1,439 -1,0 +1,439 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import java.util.Collection;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.Lock;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.managers.communication.GridIoMessage;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionDemandMessage;
- import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyMessageV2;
++import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyMessage;
+import org.apache.ignite.internal.util.GridConcurrentHashSet;
+import org.apache.ignite.internal.util.lang.GridAbsPredicate;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteInClosure;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.spi.IgniteSpiException;
+import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.apache.ignite.transactions.Transaction;
+
+import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
+import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ;
+
+/**
+ *
+ */
+@SuppressWarnings("TooBroadScope")
+public abstract class ClusterStateAbstractTest extends GridCommonAbstractTest {
+ /** Entry count. */
+ public static final int ENTRY_CNT = 5000;
+
+ /** */
+ public static final int GRID_CNT = 4;
+
+ /** */
+ private static final String CACHE_NAME = "cache1";
+
+ /** */
+ private static final Collection<Class> forbidden = new GridConcurrentHashSet<>();
+
+ /** */
+ private static AtomicReference<Exception> errEncountered = new AtomicReference<>();
+
+ /** */
+ private boolean activeOnStart = true;
+
+ /** */
+ private boolean client;
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+ cfg.setActiveOnStart(activeOnStart);
+
+ cfg.setCacheConfiguration(cacheConfiguration(CACHE_NAME));
+
+ if (client)
+ cfg.setClientMode(true);
+
+ cfg.setCommunicationSpi(new TestCommunicationSpi());
+
+ return cfg;
+ }
+
+ /**
+ * @param cacheName Cache name.
+ * @return Cache configuration.
+ */
+ protected abstract CacheConfiguration cacheConfiguration(String cacheName);
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ stopAllGrids();
+
+ forbidden.clear();
+
+ Exception err = errEncountered.getAndSet(null);
+
+ if (err != null)
+ throw err;
+ }
+
+ /**
+ * @throws Exception if failed.
+ */
+ public void testDynamicCacheStart() throws Exception {
+ activeOnStart = false;
+
- forbidden.add(GridDhtPartitionSupplyMessageV2.class);
++ forbidden.add(GridDhtPartitionSupplyMessage.class);
+ forbidden.add(GridDhtPartitionDemandMessage.class);
+
+ startGrids(GRID_CNT);
+
+ checkInactive(GRID_CNT);
+
+ forbidden.clear();
+
+ grid(0).active(true);
+
+ IgniteCache<Object, Object> cache2 = grid(0).createCache(new CacheConfiguration<>("cache2"));
+
+ for (int k = 0; k < ENTRY_CNT; k++)
+ cache2.put(k, k);
+
+ grid(0).active(false);
+
+ checkInactive(GRID_CNT);
+
+ stopAllGrids();
+ }
+
+ /**
+ * @throws Exception if failed.
+ */
+ public void testNoRebalancing() throws Exception {
+ activeOnStart = false;
+
- forbidden.add(GridDhtPartitionSupplyMessageV2.class);
++ forbidden.add(GridDhtPartitionSupplyMessage.class);
+ forbidden.add(GridDhtPartitionDemandMessage.class);
+
+ startGrids(GRID_CNT);
+
+ checkInactive(GRID_CNT);
+
+ forbidden.clear();
+
+ grid(0).active(true);
+
+ awaitPartitionMapExchange();
+
+ final IgniteCache<Object, Object> cache = grid(0).cache(CACHE_NAME);
+
+ for (int k = 0; k < ENTRY_CNT; k++)
+ cache.put(k, k);
+
+ for (int g = 0; g < GRID_CNT; g++) {
+ // Tests that state changes are propagated to existing and new nodes.
+ assertTrue(grid(g).active());
+
+ IgniteCache<Object, Object> cache0 = grid(g).cache(CACHE_NAME);
+
+ for (int k = 0; k < ENTRY_CNT; k++)
+ assertEquals(k, cache0.get(k));
+ }
+
+ // Check that new node startup and shutdown works fine after activation.
+ startGrid(GRID_CNT);
+ startGrid(GRID_CNT + 1);
+
+ for (int g = 0; g < GRID_CNT + 2; g++) {
+ IgniteCache<Object, Object> cache0 = grid(g).cache(CACHE_NAME);
+
+ for (int k = 0; k < ENTRY_CNT; k++)
+ assertEquals("Failed for [grid=" + g + ", key=" + k + ']', k, cache0.get(k));
+ }
+
+ stopGrid(GRID_CNT + 1);
+
+ for (int g = 0; g < GRID_CNT + 1; g++)
+ grid(g).cache(CACHE_NAME).rebalance().get();
+
+ stopGrid(GRID_CNT);
+
+ for (int g = 0; g < GRID_CNT; g++) {
+ IgniteCache<Object, Object> cache0 = grid(g).cache(CACHE_NAME);
+
+ for (int k = 0; k < ENTRY_CNT; k++)
+ assertEquals(k, cache0.get(k));
+ }
+
+ grid(0).active(false);
+
+ GridTestUtils.waitForCondition(new GridAbsPredicate() {
+ @Override public boolean apply() {
+ for (int g = 0; g < GRID_CNT; g++) {
+ if (grid(g).active())
+ return false;
+ }
+
+ return true;
+ }
+ }, 5000);
+
+ checkInactive(GRID_CNT);
+
- forbidden.add(GridDhtPartitionSupplyMessageV2.class);
++ forbidden.add(GridDhtPartitionSupplyMessage.class);
+ forbidden.add(GridDhtPartitionDemandMessage.class);
+
+ // Should stop without exchange.
+ stopAllGrids();
+ }
+
+ /**
+ * @throws Exception if failed.
+ */
+ public void testActivationFromClient() throws Exception {
- forbidden.add(GridDhtPartitionSupplyMessageV2.class);
++ forbidden.add(GridDhtPartitionSupplyMessage.class);
+ forbidden.add(GridDhtPartitionDemandMessage.class);
+
+ activeOnStart = false;
+
+ startGrids(GRID_CNT);
+
+ client = true;
+
+ startGrid(GRID_CNT);
+
+ checkInactive(GRID_CNT + 1);
+
+ Ignite cl = grid(GRID_CNT);
+
+ forbidden.clear();
+
+ cl.active(true);
+
+ awaitPartitionMapExchange();
+
+ IgniteCache<Object, Object> cache = cl.cache(CACHE_NAME);
+
+ for (int k = 0; k < ENTRY_CNT; k++)
+ cache.put(k, k);
+
+ for (int g = 0; g < GRID_CNT + 1; g++) {
+ // Tests that state changes are propagated to existing and new nodes.
+ assertTrue(grid(g).active());
+
+ IgniteCache<Object, Object> cache0 = grid(g).cache(CACHE_NAME);
+
+ for (int k = 0; k < ENTRY_CNT; k++)
+ assertEquals(k, cache0.get(k));
+ }
+
+ cl.active(false);
+
+ GridTestUtils.waitForCondition(new GridAbsPredicate() {
+ @Override public boolean apply() {
+ for (int g = 0; g < GRID_CNT + 1; g++) {
+ if (grid(g).active())
+ return false;
+ }
+
+ return true;
+ }
+ }, 5000);
+
+ checkInactive(GRID_CNT + 1);
+ }
+
+ /**
+ * Tests that state doesn't change until all acquired locks are released.
+ *
+ * @throws Exception If fails.
+ */
+ public void testDeactivationWithPendingLock() throws Exception {
+ fail("https://issues.apache.org/jira/browse/IGNITE-4931");
+
+ startGrids(GRID_CNT);
+
+ final CountDownLatch finishedLatch = new CountDownLatch(1);
+
+ Lock lock = grid(0).cache(CACHE_NAME).lock(1);
+
+ IgniteInternalFuture<?> fut;
+
+ lock.lock();
+
+ try {
+ fut = multithreadedAsync(new Runnable() {
+ @Override public void run() {
+ grid(1).active(false);
+
+ finishedLatch.countDown();
+ }
+ }, 1);
+
+ U.sleep(2000);
+
+ assert !fut.isDone();
+
+ boolean hasActive = false;
+
+ for (int g = 0; g < GRID_CNT; g++) {
+ IgniteEx grid = grid(g);
+
+ if (grid.active()) {
+ hasActive = true;
+
+ break;
+ }
+
+ }
+
+ assertTrue(hasActive);
+ }
+ finally {
+ lock.unlock();
+ }
+
+ fut.get(getTestTimeout(), TimeUnit.MILLISECONDS);
+
+ checkInactive(GRID_CNT);
+
+ finishedLatch.await();
+ }
+
+ /**
+ * Tests that state doesn't change until all pending transactions are finished.
+ *
+ * @throws Exception If fails.
+ */
+ public void testDeactivationWithPendingTransaction() throws Exception {
+ fail("https://issues.apache.org/jira/browse/IGNITE-4931");
+
+ startGrids(GRID_CNT);
+
+ final CountDownLatch finishedLatch = new CountDownLatch(1);
+
+ final Ignite ignite0 = grid(0);
+
+ final IgniteCache<Object, Object> cache0 = ignite0.cache(CACHE_NAME);
+
+ IgniteInternalFuture<?> fut;
+
+ try (Transaction tx = ignite0.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) {
+ cache0.get(1);
+
+ fut = multithreadedAsync(new Runnable() {
+ @Override public void run() {
+ ignite0.active(false);
+
+ finishedLatch.countDown();
+ }
+ }, 1);
+
+ U.sleep(2000);
+
+ assert !fut.isDone();
+
+ boolean hasActive = false;
+
+ for (int g = 0; g < GRID_CNT; g++) {
+ IgniteEx grid = grid(g);
+
+ if (grid.active()) {
+ hasActive = true;
+
+ break;
+ }
+
+ }
+
+ assertTrue(hasActive);
+
+ cache0.put(1, 2);
+
+ tx.commit();
+ }
+
+ fut.get(getTestTimeout(), TimeUnit.MILLISECONDS);
+
+ checkInactive(GRID_CNT);
+
+ ignite0.active(true);
+
+ for (int g = 0; g < GRID_CNT; g++)
+ assertEquals(2, grid(g).cache(CACHE_NAME).get(1));
+
+ finishedLatch.await();
+ }
+
+ /**
+ *
+ */
+ private void checkInactive(int cnt) {
+ for (int g = 0; g < cnt; g++)
+ assertFalse(grid(g).active());
+ }
+
+ /**
+ *
+ */
+ private static class TestCommunicationSpi extends TcpCommunicationSpi {
+ /** {@inheritDoc} */
+ @Override public void sendMessage(ClusterNode node, Message msg, IgniteInClosure<IgniteException> ackC) throws IgniteSpiException {
+ checkForbidden((GridIoMessage)msg);
+
+ super.sendMessage(node, msg, ackC);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void sendMessage(ClusterNode node, Message msg) throws IgniteSpiException {
+ checkForbidden((GridIoMessage)msg);
+
+ super.sendMessage(node, msg);
+ }
+
+ /**
+ * @param msg Message to check.
+ */
+ private void checkForbidden(GridIoMessage msg) {
+ if (forbidden.contains(msg.message().getClass())) {
+ IgniteSpiException err = new IgniteSpiException("Message is forbidden for this test: " + msg.message());
+
+ // Set error in case if this exception is not visible to the user code.
+ errEncountered.compareAndSet(null, err);
+
+ throw err;
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/3eb05de5/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLateAffinityAssignmentTest.java
----------------------------------------------------------------------