You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/05/21 09:53:05 UTC
[06/28] incubator-ignite git commit: IGNITE-920 - Fixed value sending
in near cache.
IGNITE-920 - Fixed value sending in near cache.
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/a927eb29
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/a927eb29
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/a927eb29
Branch: refs/heads/ignite-23
Commit: a927eb29276796831ead8e9351e30947c4480bf8
Parents: d3c056e
Author: Alexey Goncharuk <ag...@gridgain.com>
Authored: Mon May 18 11:38:49 2015 -0700
Committer: Alexey Goncharuk <ag...@gridgain.com>
Committed: Mon May 18 11:38:49 2015 -0700
----------------------------------------------------------------------
.../apache/ignite/internal/IgniteKernal.java | 24 +++-
.../distributed/GridDistributedTxMapping.java | 5 +-
.../distributed/dht/GridDhtTxPrepareFuture.java | 1 +
.../distributed/near/GridNearCacheEntry.java | 2 +-
.../cache/distributed/near/GridNearTxLocal.java | 5 +-
.../near/GridNearTxPrepareFuture.java | 10 +-
.../near/GridNearTxPrepareResponse.java | 28 +++-
.../transactions/IgniteTxLocalAdapter.java | 4 +-
.../cache/IgniteCacheNearLockValueSelfTest.java | 144 +++++++++++++++++++
.../ignite/testsuites/IgniteCacheTestSuite.java | 2 +
10 files changed, 212 insertions(+), 13 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a927eb29/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
index d98b023..2d9828a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
@@ -2297,7 +2297,8 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
guard();
try {
- ctx.cache().dynamicStartCache(cacheCfg, cacheCfg.getName(), null, false).get();
+ if (ctx.cache().cache(cacheCfg.getName()) == null)
+ ctx.cache().dynamicStartCache(cacheCfg, cacheCfg.getName(), null, false).get();
return ctx.cache().publicJCache(cacheCfg.getName());
}
@@ -2341,7 +2342,14 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
guard();
try {
- ctx.cache().dynamicStartCache(cacheCfg, cacheCfg.getName(), nearCfg, false).get();
+ IgniteInternalCache<Object, Object> cache = ctx.cache().cache(cacheCfg.getName());
+
+ if (cache == null)
+ ctx.cache().dynamicStartCache(cacheCfg, cacheCfg.getName(), nearCfg, false).get();
+ else {
+ if (cache.configuration().getNearConfiguration() == null)
+ ctx.cache().dynamicStartCache(cacheCfg, cacheCfg.getName(), nearCfg, false).get();
+ }
return ctx.cache().publicJCache(cacheCfg.getName());
}
@@ -2380,7 +2388,14 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
guard();
try {
- ctx.cache().dynamicStartCache(null, cacheName, nearCfg, false).get();
+ IgniteInternalCache<Object, Object> internalCache = ctx.cache().cache(cacheName);
+
+ if (internalCache == null)
+ ctx.cache().dynamicStartCache(null, cacheName, nearCfg, false).get();
+ else {
+ if (internalCache.configuration().getNearConfiguration() == null)
+ ctx.cache().dynamicStartCache(null, cacheName, nearCfg, false).get();
+ }
return ctx.cache().publicJCache(cacheName);
}
@@ -2418,7 +2433,8 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
guard();
try {
- ctx.cache().getOrCreateFromTemplate(cacheName).get();
+ if (ctx.cache().cache(cacheName) == null)
+ ctx.cache().getOrCreateFromTemplate(cacheName).get();
return ctx.cache().publicJCache(cacheName);
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a927eb29/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxMapping.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxMapping.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxMapping.java
index 58c7725..fded3c9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxMapping.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxMapping.java
@@ -169,12 +169,13 @@ public class GridDistributedTxMapping implements Externalizable {
/**
* @param dhtVer DHT version.
+ * @param writeVer DHT writeVersion.
*/
- public void dhtVersion(GridCacheVersion dhtVer) {
+ public void dhtVersion(GridCacheVersion dhtVer, GridCacheVersion writeVer) {
this.dhtVer = dhtVer;
for (IgniteTxEntry e : entries)
- e.dhtVersion(dhtVer);
+ e.dhtVersion(writeVer);
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a927eb29/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
index 3a1a80a..8cb10cd 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
@@ -586,6 +586,7 @@ public final class GridDhtTxPrepareFuture<K, V> extends GridCompoundIdentityFutu
tx.colocated() ? tx.xid() : tx.nearFutureId(),
nearMiniId == null ? tx.xid() : nearMiniId,
tx.xidVersion(),
+ tx.writeVersion(),
tx.invalidPartitions(),
ret,
prepErr);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a927eb29/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java
index c7fa4ab..29a8e5e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java
@@ -301,7 +301,7 @@ public class GridNearCacheEntry extends GridDistributedCacheEntry {
else {
CacheObject val0 = valueBytesUnlocked();
- return F.t(ver, val0);
+ return F.t(dhtVer, val0);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a927eb29/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
index c665354..1e9b502 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
@@ -388,15 +388,16 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
/**
* @param nodeId Node ID.
* @param dhtVer DHT version.
+ * @param writeVer Write version.
*/
- void addDhtVersion(UUID nodeId, GridCacheVersion dhtVer) {
+ void addDhtVersion(UUID nodeId, GridCacheVersion dhtVer, GridCacheVersion writeVer) {
// This step is very important as near and DHT versions grow separately.
cctx.versions().onReceived(nodeId, dhtVer);
GridDistributedTxMapping m = mappings.get(nodeId);
if (m != null)
- m.dhtVersion(dhtVer);
+ m.dhtVersion(dhtVer, writeVer);
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a927eb29/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFuture.java
index f573187..9284f49 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFuture.java
@@ -1023,10 +1023,16 @@ public final class GridNearTxPrepareFuture<K, V> extends GridCompoundIdentityFut
}
if (!m.empty()) {
+ GridCacheVersion writeVer = res.writeVersion();
+
+ // Backward compatibility.
+ if (writeVer == null)
+ writeVer = res.dhtVersion();
+
// Register DHT version.
- tx.addDhtVersion(m.node().id(), res.dhtVersion());
+ tx.addDhtVersion(m.node().id(), res.dhtVersion(), writeVer);
- m.dhtVersion(res.dhtVersion());
+ m.dhtVersion(res.dhtVersion(), writeVer);
if (m.near())
tx.readyNearLocks(m, res.pending(), res.committedVersions(), res.rolledbackVersions());
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a927eb29/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareResponse.java
index 2456674..f8c07f7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareResponse.java
@@ -53,6 +53,9 @@ public class GridNearTxPrepareResponse extends GridDistributedTxPrepareResponse
/** DHT version. */
private GridCacheVersion dhtVer;
+ /** Write version. */
+ private GridCacheVersion writeVer;
+
/** */
@GridToStringInclude
@GridDirectCollection(int.class)
@@ -101,6 +104,7 @@ public class GridNearTxPrepareResponse extends GridDistributedTxPrepareResponse
IgniteUuid futId,
IgniteUuid miniId,
GridCacheVersion dhtVer,
+ GridCacheVersion writeVer,
Collection<Integer> invalidParts,
GridCacheReturn retVal,
Throwable err
@@ -114,6 +118,7 @@ public class GridNearTxPrepareResponse extends GridDistributedTxPrepareResponse
this.futId = futId;
this.miniId = miniId;
this.dhtVer = dhtVer;
+ this.writeVer = writeVer;
this.invalidParts = invalidParts;
this.retVal = retVal;
}
@@ -158,6 +163,13 @@ public class GridNearTxPrepareResponse extends GridDistributedTxPrepareResponse
}
/**
+ * @return Write version.
+ */
+ public GridCacheVersion writeVersion() {
+ return writeVer;
+ }
+
+ /**
* Adds owned value.
*
* @param key Key.
@@ -371,6 +383,12 @@ public class GridNearTxPrepareResponse extends GridDistributedTxPrepareResponse
writer.incrementState();
+ case 19:
+ if (!writer.writeMessage("writeVer", writeVer))
+ return false;
+
+ writer.incrementState();
+
}
return true;
@@ -459,6 +477,14 @@ public class GridNearTxPrepareResponse extends GridDistributedTxPrepareResponse
reader.incrementState();
+ case 19:
+ writeVer = reader.readMessage("writeVer");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
}
return true;
@@ -471,7 +497,7 @@ public class GridNearTxPrepareResponse extends GridDistributedTxPrepareResponse
/** {@inheritDoc} */
@Override public byte fieldsCount() {
- return 19;
+ return 20;
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a927eb29/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
index fc3efba..5c5076e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
@@ -738,7 +738,9 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
// in order to keep near entries on backup nodes until
// backup remote transaction completes.
if (cacheCtx.isNear()) {
- ((GridNearCacheEntry)cached).recordDhtVersion(txEntry.dhtVersion());
+ if (txEntry.op() == CREATE || txEntry.op() == UPDATE ||
+ txEntry.op() == DELETE || txEntry.op() == TRANSFORM)
+ ((GridNearCacheEntry)cached).recordDhtVersion(txEntry.dhtVersion());
if ((txEntry.op() == CREATE || txEntry.op() == UPDATE) &&
txEntry.conflictExpireTime() == CU.EXPIRE_TIME_CALCULATE) {
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a927eb29/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheNearLockValueSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheNearLockValueSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheNearLockValueSelfTest.java
new file mode 100644
index 0000000..fe60331
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheNearLockValueSelfTest.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.internal.processors.cache;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cache.*;
+import org.apache.ignite.cluster.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.managers.communication.*;
+import org.apache.ignite.internal.processors.cache.distributed.near.*;
+import org.apache.ignite.plugin.extensions.communication.*;
+import org.apache.ignite.spi.*;
+import org.apache.ignite.spi.communication.tcp.*;
+import org.apache.ignite.testframework.junits.common.*;
+import org.apache.ignite.transactions.*;
+
+import java.util.*;
+import java.util.concurrent.*;
+
+import static org.apache.ignite.transactions.TransactionConcurrency.*;
+import static org.apache.ignite.transactions.TransactionIsolation.*;
+
+/**
+ *
+ */
+public class IgniteCacheNearLockValueSelfTest extends GridCommonAbstractTest {
+ /** {@inheritDoc} */
+ @Override protected void beforeTestsStarted() throws Exception {
+ startGridsMultiThreaded(2);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTestsStopped() throws Exception {
+ super.afterTestsStopped();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+ if (getTestGridName(0).equals(gridName))
+ cfg.setClientMode(true);
+
+ cfg.setCommunicationSpi(new TestCommunicationSpi());
+
+ return cfg;
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testDhtVersion() throws Exception {
+ CacheConfiguration<Object, Object> pCfg = new CacheConfiguration<>("partitioned");
+
+ pCfg.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL);
+
+ try (IgniteCache<Object, Object> cache = ignite(0).getOrCreateCache(pCfg, new NearCacheConfiguration<>())) {
+ cache.put("key1", "val1");
+
+ for (int i = 0; i < 3; i++) {
+ ((TestCommunicationSpi)ignite(0).configuration().getCommunicationSpi()).clear();
+ ((TestCommunicationSpi)ignite(1).configuration().getCommunicationSpi()).clear();
+
+ try (Transaction tx = ignite(0).transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) {
+ cache.get("key1");
+
+ tx.commit();
+ }
+
+ TestCommunicationSpi comm = (TestCommunicationSpi)ignite(0).configuration().getCommunicationSpi();
+
+ assertEquals(1, comm.requests().size());
+
+ GridCacheAdapter<Object, Object> primary = ((IgniteKernal)grid(1)).internalCache("partitioned");
+
+ GridCacheEntryEx dhtEntry = primary.peekEx(primary.context().toCacheKeyObject("key1"));
+
+ assertNotNull(dhtEntry);
+
+ GridNearLockRequest req = comm.requests().iterator().next();
+
+ assertEquals(dhtEntry.version(), req.dhtVersion(0));
+
+ // Check entry version in near cache after commit.
+ GridCacheAdapter<Object, Object> near = ((IgniteKernal)grid(0)).internalCache("partitioned");
+
+ GridNearCacheEntry nearEntry = (GridNearCacheEntry)near.peekEx(near.context().toCacheKeyObject("key1"));
+
+ assertNotNull(nearEntry);
+
+ assertEquals(dhtEntry.version(), nearEntry.dhtVersion());
+ }
+ }
+ }
+
+ /**
+ *
+ */
+ private static class TestCommunicationSpi extends TcpCommunicationSpi {
+ /** */
+ private Collection<GridNearLockRequest> reqs = new ConcurrentLinkedDeque<>();
+
+ /** {@inheritDoc} */
+ @Override public void sendMessage(ClusterNode node, Message msg) throws IgniteSpiException {
+ if (msg instanceof GridIoMessage) {
+ GridIoMessage ioMsg = (GridIoMessage)msg;
+
+ if (ioMsg.message() instanceof GridNearLockRequest)
+ reqs.add((GridNearLockRequest)ioMsg.message());
+ }
+
+ super.sendMessage(node, msg);
+ }
+
+ /**
+ * @return Collected requests.
+ */
+ public Collection<GridNearLockRequest> requests() {
+ return reqs;
+ }
+
+ /**
+ *
+ */
+ public void clear() {
+ reqs.clear();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a927eb29/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
index 28b10d9..159a8d8 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
@@ -446,6 +446,8 @@ public class IgniteCacheTestSuite extends TestSuite {
suite.addTestSuite(CacheNoValueClassOnServerNodeTest.class);
+ suite.addTestSuite(IgniteCacheNearLockValueSelfTest.class);
+
return suite;
}
}