You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/05/12 07:13:49 UTC
[1/8] ignite git commit: IGNITE-5194 .NET: Fix TryGetIgnite to return
an instance with any name when there is only one
Repository: ignite
Updated Branches:
refs/heads/ignite-5075-cacheStart e0e587864 -> 3762b0dd8
IGNITE-5194 .NET: Fix TryGetIgnite to return an instance with any name when there is only one
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/f0051e18
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/f0051e18
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/f0051e18
Branch: refs/heads/ignite-5075-cacheStart
Commit: f0051e18add681e613efe9420c99d7f5c438a192
Parents: 57c6705
Author: Pavel Tupitsyn <pt...@apache.org>
Authored: Thu May 11 13:43:37 2017 +0300
Committer: Pavel Tupitsyn <pt...@apache.org>
Committed: Thu May 11 13:43:37 2017 +0300
----------------------------------------------------------------------
.../Apache.Ignite.Core.Tests/IgniteStartStopTest.cs | 2 ++
.../platforms/dotnet/Apache.Ignite.Core/Ignition.cs | 16 ++++++++++++----
2 files changed, 14 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/f0051e18/modules/platforms/dotnet/Apache.Ignite.Core.Tests/IgniteStartStopTest.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/IgniteStartStopTest.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/IgniteStartStopTest.cs
index 486878a..529128a 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/IgniteStartStopTest.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/IgniteStartStopTest.cs
@@ -185,9 +185,11 @@ namespace Apache.Ignite.Core.Tests
Ignition.Start(cfg);
Assert.IsNotNull(Ignition.GetIgnite());
+ Assert.IsNotNull(Ignition.TryGetIgnite());
Ignition.Start(cfg);
Assert.Throws<IgniteException>(() => Ignition.GetIgnite());
+ Assert.IsNull(Ignition.TryGetIgnite());
Assert.AreEqual(2, Ignition.GetAll().Count);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/f0051e18/modules/platforms/dotnet/Apache.Ignite.Core/Ignition.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Ignition.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Ignition.cs
index cdb1064..fdddbb7 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Ignition.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Ignition.cs
@@ -651,14 +651,22 @@ namespace Apache.Ignite.Core
}
/// <summary>
- /// Gets an instance of default no-name grid, or <c>null</c> if none found. Note that
- /// caller of this method should not assume that it will return the same
- /// instance every time.
+ /// Gets the default Ignite instance with null name, or an instance with any name when there is only one.
+ /// Returns null when there are no Ignite instances started, or when there are more than one,
+ /// and none of them has null name.
/// </summary>
/// <returns>An instance of default no-name grid, or null.</returns>
public static IIgnite TryGetIgnite()
{
- return TryGetIgnite(null);
+ lock (SyncRoot)
+ {
+ if (Nodes.Count == 1)
+ {
+ return Nodes.Single().Value;
+ }
+
+ return TryGetIgnite(null);
+ }
}
/// <summary>
[8/8] ignite git commit: ignite-5075
Posted by sb...@apache.org.
ignite-5075
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/3762b0dd
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/3762b0dd
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/3762b0dd
Branch: refs/heads/ignite-5075-cacheStart
Commit: 3762b0dd85796f3b3875a4b8f67b21f68e586c15
Parents: 0f25894
Author: sboikov <sb...@gridgain.com>
Authored: Fri May 12 09:51:31 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Fri May 12 10:13:39 2017 +0300
----------------------------------------------------------------------
.../cache/CacheAffinitySharedManager.java | 17 ++----
.../internal/processors/cache/CacheData.java | 1 -
.../cache/DynamicCacheDescriptor.java | 2 -
.../processors/cache/ExchangeActions.java | 2 +-
.../processors/cache/GridCacheContext.java | 12 +++++
.../processors/cache/GridCacheIoManager.java | 4 +-
.../processors/cache/GridCacheProcessor.java | 56 ++++++++------------
.../dht/GridClientPartitionTopology.java | 2 +-
.../dht/GridDhtPartitionTopologyImpl.java | 4 +-
.../GridDhtPartitionsExchangeFuture.java | 13 ++---
.../ignite/spi/discovery/DiscoveryDataBag.java | 2 +-
.../internal/GridNodeMetricsLogSelfTest.java | 2 +-
.../IgniteCrossCacheTxStoreSelfTest.java | 1 -
.../loadtests/hashmap/GridCacheTestContext.java | 2 +
14 files changed, 52 insertions(+), 68 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/3762b0dd/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
index bd41ccc..d186cdb 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
@@ -118,16 +118,6 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
}
/**
- * @param cacheId Cache ID.
- * @return Cache start topology version.
- */
- public AffinityTopologyVersion localStartVersion(int cacheId) {
- DynamicCacheDescriptor desc = registeredCaches.get(cacheId);
-
- return desc != null ? desc.localStartVersion() : null;
- }
-
- /**
* Callback invoked from discovery thread when discovery message is received.
*
* @param type Event type.
@@ -330,7 +320,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
}
/**
- * @param exchActions Cache change requests to execte on exchange.
+ * @param exchActions Cache change requests to execute on exchange.
*/
private void updateCachesInfo(ExchangeActions exchActions) {
for (ExchangeActions.ActionData action : exchActions.stopRequests()) {
@@ -409,7 +399,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
if (startCache)
cctx.cache().prepareCacheStart(cacheDesc, nearCfg, fut.topologyVersion());
- if (fut.isCacheAdded(cacheDesc.cacheId(), fut.topologyVersion())) {
+ if (fut.cacheAddedOnExchange(cacheDesc.cacheId(), cacheDesc.receivedFrom())) {
if (fut.discoCache().cacheAffinityNodes(req.cacheName()).isEmpty())
U.quietAndWarn(log, "No server nodes found for cache client: " + req.cacheName());
}
@@ -891,9 +881,8 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
assert cacheDesc != null : aff.cacheName();
- return fut.cacheStarted(aff.cacheId()) ||
+ return fut.cacheAddedOnExchange(aff.cacheId(), cacheDesc.receivedFrom()) ||
!fut.exchangeId().nodeId().equals(cctx.localNodeId()) ||
- cctx.localNodeId().equals(cacheDesc.receivedFrom()) ||
(affNodes.size() == 1 && affNodes.contains(cctx.localNode()));
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/3762b0dd/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheData.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheData.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheData.java
index 0c97ab0..4768a9a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheData.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheData.java
@@ -20,7 +20,6 @@ package org.apache.ignite.internal.processors.cache;
import java.io.Serializable;
import java.util.UUID;
import org.apache.ignite.configuration.CacheConfiguration;
-import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.query.QuerySchema;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.lang.IgniteUuid;
http://git-wip-us.apache.org/repos/asf/ignite/blob/3762b0dd/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java
index 130ebde..5c7060c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java
@@ -17,8 +17,6 @@
package org.apache.ignite.internal.processors.cache;
-import java.util.HashMap;
-import java.util.Map;
import java.util.UUID;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.cache.CacheMode;
http://git-wip-us.apache.org/repos/asf/ignite/blob/3762b0dd/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeActions.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeActions.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeActions.java
index eed60ac..99fd29d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeActions.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeActions.java
@@ -31,7 +31,7 @@ import java.util.List;
import org.jetbrains.annotations.Nullable;
/**
- * Cache change requests to execute on request.
+ * Cache change requests to execute when receive {@link DynamicCacheChangeBatch} event.
*/
public class ExchangeActions {
/** */
http://git-wip-us.apache.org/repos/asf/ignite/blob/3762b0dd/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
index 5a6f793..aa503b1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
@@ -237,6 +237,9 @@ public class GridCacheContext<K, V> implements Externalizable {
/** Topology version when cache was started on local node. */
private AffinityTopologyVersion locStartTopVer;
+ /** */
+ private UUID rcvdFrom;
+
/** Dynamic cache deployment ID. */
private IgniteUuid dynamicDeploymentId;
@@ -290,6 +293,7 @@ public class GridCacheContext<K, V> implements Externalizable {
CacheConfiguration cacheCfg,
CacheType cacheType,
AffinityTopologyVersion locStartTopVer,
+ UUID rcvdFrom,
boolean affNode,
boolean updatesAllowed,
MemoryPolicy memPlc,
@@ -336,6 +340,7 @@ public class GridCacheContext<K, V> implements Externalizable {
this.cacheCfg = cacheCfg;
this.cacheType = cacheType;
this.locStartTopVer = locStartTopVer;
+ this.rcvdFrom = rcvdFrom;
this.affNode = affNode;
this.updatesAllowed = updatesAllowed;
this.depEnabled = ctx.deploy().enabled() && !cacheObjects().isBinaryEnabled(cacheCfg);
@@ -455,6 +460,13 @@ public class GridCacheContext<K, V> implements Externalizable {
}
/**
+ * @return Node ID cache was received from.
+ */
+ public UUID receivedFrom() {
+ return rcvdFrom;
+ }
+
+ /**
* @return Topology version when cache was started on local node.
*/
public AffinityTopologyVersion startTopologyVersion() {
http://git-wip-us.apache.org/repos/asf/ignite/blob/3762b0dd/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
index 348d9d8..a8a4dcd 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
@@ -146,7 +146,9 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
if (cacheMsg instanceof GridDhtAffinityAssignmentRequest) {
assert cacheMsg.topologyVersion() != null : cacheMsg;
- AffinityTopologyVersion waitVer = cctx.affinity().localStartVersion(cacheMsg.cacheId());
+ DynamicCacheDescriptor cacheDesc = cctx.cache().cacheDescriptor(cacheMsg.cacheId());
+
+ AffinityTopologyVersion waitVer = cacheDesc != null ? cacheDesc.localStartVersion() : null;
if (waitVer == null)
waitVer = new AffinityTopologyVersion(cctx.localNode().order());
http://git-wip-us.apache.org/repos/asf/ignite/blob/3762b0dd/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index f9b015d..bc6a7d6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -897,7 +897,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
}
/**
- *
+ * @throws IgniteCheckedException if check failed.
*/
private void checkConsistency() throws IgniteCheckedException {
for (ClusterNode n : ctx.discovery().remoteNodes()) {
@@ -909,9 +909,8 @@ public class GridCacheProcessor extends GridProcessorAdapter {
DeploymentMode locDepMode = ctx.config().getDeploymentMode();
DeploymentMode rmtDepMode = n.attribute(IgniteNodeAttributes.ATTR_DEPLOYMENT_MODE);
- CU.checkAttributeMismatch(
- log, null, n.id(), "deploymentMode", "Deployment mode",
- locDepMode, rmtDepMode, true);
+ CU.checkAttributeMismatch(log, null, n.id(), "deploymentMode", "Deployment mode",
+ locDepMode, rmtDepMode, true);
}
}
@@ -1101,13 +1100,6 @@ public class GridCacheProcessor extends GridProcessorAdapter {
}
}
}
-// TODO
-// if (clientReconnectReqs != null) {
-// for (Map.Entry<UUID, DynamicCacheChangeBatch> e : clientReconnectReqs.entrySet())
-// processClientReconnectData(e.getKey(), e.getValue());
-//
-// clientReconnectReqs = null;
-// }
sharedCtx.onReconnected();
@@ -1336,15 +1328,17 @@ public class GridCacheProcessor extends GridProcessorAdapter {
/**
* @param cfg Cache configuration to use to create cache.
* @param pluginMgr Cache plugin manager.
- * @param cacheType Cache type.
+ * @param desc Cache descriptor.
+ * @param locStartTopVer Current topology version.
* @param cacheObjCtx Cache object context.
+ * @param affNode {@code True} if local node affinity node.
* @param updatesAllowed Updates allowed flag.
* @return Cache context.
* @throws IgniteCheckedException If failed to create cache.
*/
private GridCacheContext createCache(CacheConfiguration<?, ?> cfg,
@Nullable CachePluginManager pluginMgr,
- CacheType cacheType,
+ DynamicCacheDescriptor desc,
AffinityTopologyVersion locStartTopVer,
CacheObjectContext cacheObjCtx,
boolean affNode,
@@ -1365,7 +1359,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
QueryUtils.prepareCacheConfiguration(cfg);
- validate(ctx.config(), cfg, cacheType, cfgStore);
+ validate(ctx.config(), cfg, desc.cacheType(), cfgStore);
if (pluginMgr == null)
pluginMgr = new CachePluginManager(ctx, cfg);
@@ -1375,7 +1369,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
sharedCtx.jta().registerCache(cfg);
// Skip suggestions for internal caches.
- if (cacheType.userCache())
+ if (desc.cacheType().userCache())
suggestOptimizations(cfg, cfgStore != null);
Collection<Object> toPrepare = new ArrayList<>();
@@ -1418,8 +1412,9 @@ public class GridCacheProcessor extends GridProcessorAdapter {
ctx,
sharedCtx,
cfg,
- cacheType,
+ desc.cacheType(),
locStartTopVer,
+ desc.receivedFrom(),
affNode,
updatesAllowed,
memPlc,
@@ -1550,8 +1545,9 @@ public class GridCacheProcessor extends GridProcessorAdapter {
ctx,
sharedCtx,
cfg,
- cacheType,
+ desc.cacheType(),
locStartTopVer,
+ desc.receivedFrom(),
affNode,
true,
memPlc,
@@ -1728,8 +1724,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
prepareCacheStart(
cacheDesc.cacheConfiguration(),
nearCfg,
- cacheDesc.cacheType(),
- cacheDesc.deploymentId(),
+ cacheDesc,
exchTopVer,
cacheDesc.schema()
);
@@ -1749,8 +1744,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
prepareCacheStart(
desc.cacheConfiguration(),
t.get2(),
- desc.cacheType(),
- desc.deploymentId(),
+ desc,
exchTopVer,
desc.schema()
);
@@ -1778,8 +1772,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
prepareCacheStart(
desc.cacheConfiguration(),
null,
- desc.cacheType(),
- desc.deploymentId(),
+ desc,
exchTopVer,
desc.schema()
);
@@ -1793,8 +1786,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
/**
* @param startCfg Start configuration.
* @param reqNearCfg Near configuration if specified for client cache start request.
- * @param cacheType Cache type.
- * @param deploymentId Deployment ID.
+ * @param desc Cache descriptor.
* @param exchTopVer Current exchange version.
* @param schema Query schema.
* @throws IgniteCheckedException If failed.
@@ -1802,8 +1794,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
private void prepareCacheStart(
CacheConfiguration startCfg,
@Nullable NearCacheConfiguration reqNearCfg,
- CacheType cacheType,
- IgniteUuid deploymentId,
+ DynamicCacheDescriptor desc,
AffinityTopologyVersion exchTopVer,
@Nullable QuerySchema schema
) throws IgniteCheckedException {
@@ -1830,13 +1821,13 @@ public class GridCacheProcessor extends GridProcessorAdapter {
GridCacheContext cacheCtx = createCache(ccfg,
null,
- cacheType,
+ desc,
exchTopVer,
cacheObjCtx,
affNode,
true);
- cacheCtx.dynamicDeploymentId(deploymentId);
+ cacheCtx.dynamicDeploymentId(desc.deploymentId());
GridCacheAdapter cache = cacheCtx.cache();
@@ -1894,9 +1885,6 @@ public class GridCacheProcessor extends GridProcessorAdapter {
sharedCtx.removeCacheContext(ctx);
-// assert req.deploymentId().equals(ctx.dynamicDeploymentId()) : "Different deployment IDs [req=" + req +
-// ", ctxDepId=" + ctx.dynamicDeploymentId() + ']';
-
onKernalStop(cache, req.destroy());
stopCache(cache, true, req.destroy());
@@ -1913,7 +1901,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
@SuppressWarnings("unchecked")
public void onExchangeDone(
AffinityTopologyVersion topVer,
- ExchangeActions exchActions,
+ @Nullable ExchangeActions exchActions,
Throwable err
) {
for (GridCacheAdapter<?, ?> cache : caches.values()) {
@@ -2531,8 +2519,8 @@ public class GridCacheProcessor extends GridProcessorAdapter {
if (req.close() && desc.cacheConfiguration().getCacheMode() == LOCAL) {
req.close(false);
+
req.stop(true);
- req.destroy(true);
}
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/3762b0dd/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
index f3c3a1b..1de64c5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
@@ -281,7 +281,7 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
long updateSeq = this.updateSeq.incrementAndGet();
// If this is the oldest node.
- if (oldest.id().equals(loc.id()) || exchFut.isCacheAdded(cacheId, exchId.topologyVersion())) {
+ if (oldest.id().equals(loc.id()) || exchFut.dynamicCacheStarted(cacheId)) {
if (node2part == null) {
node2part = new GridDhtPartitionFullMap(oldest.id(), oldest.order(), updateSeq);
http://git-wip-us.apache.org/repos/asf/ignite/blob/3762b0dd/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
index f16bb15..0410dda 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
@@ -393,7 +393,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
int num = cctx.affinity().partitions();
if (cctx.rebalanceEnabled()) {
- boolean added = exchFut.isCacheAdded(cctx.cacheId(), exchId.topologyVersion());
+ boolean added = exchFut.cacheAddedOnExchange(cctx.cacheId(), cctx.receivedFrom());
boolean first = (loc.equals(oldest) && loc.id().equals(exchId.nodeId()) && exchId.isJoined()) || added;
@@ -541,7 +541,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
cntrMap.clear();
// If this is the oldest node.
- if (oldest != null && (loc.equals(oldest) || exchFut.isCacheAdded(cctx.cacheId(), exchId.topologyVersion()))) {
+ if (oldest != null && (loc.equals(oldest) || exchFut.cacheAddedOnExchange(cctx.cacheId(), cctx.receivedFrom()))) {
if (node2part == null) {
node2part = new GridDhtPartitionFullMap(oldest.id(), oldest.order(), updateSeq);
http://git-wip-us.apache.org/repos/asf/ignite/blob/3762b0dd/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index 477d0ce..b4cb3c1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -380,23 +380,18 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
/**
* @param cacheId Cache ID to check.
- * @param topVer Topology version.
+ * @param rcvdFrom Topology version.
* @return {@code True} if cache was added during this exchange.
*/
- public boolean isCacheAdded(int cacheId, AffinityTopologyVersion topVer) {
- if (cacheStarted(cacheId))
- return true;
-
- GridCacheContext<?, ?> cacheCtx = cctx.cacheContext(cacheId);
-
- return cacheCtx != null && F.eq(cacheCtx.startTopologyVersion(), topVer);
+ public boolean cacheAddedOnExchange(int cacheId, UUID rcvdFrom) {
+ return dynamicCacheStarted(cacheId) || (exchId.isJoined() && exchId.nodeId().equals(rcvdFrom));
}
/**
* @param cacheId Cache ID.
* @return {@code True} if non-client cache was added during this exchange.
*/
- public boolean cacheStarted(int cacheId) {
+ public boolean dynamicCacheStarted(int cacheId) {
return exchActions != null && exchActions.cacheStarted(cacheId);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/3762b0dd/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoveryDataBag.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoveryDataBag.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoveryDataBag.java
index 3737d69..803beed 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoveryDataBag.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoveryDataBag.java
@@ -188,7 +188,7 @@ public class DiscoveryDataBag {
}
/**
- * @return ID on joining node.
+ * @return ID of joining node.
*/
public UUID joiningNodeId() {
return joiningNodeId;
http://git-wip-us.apache.org/repos/asf/ignite/blob/3762b0dd/modules/core/src/test/java/org/apache/ignite/internal/GridNodeMetricsLogSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/GridNodeMetricsLogSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/GridNodeMetricsLogSelfTest.java
index ea298e6..b6114d1 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/GridNodeMetricsLogSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/GridNodeMetricsLogSelfTest.java
@@ -79,7 +79,7 @@ public class GridNodeMetricsLogSelfTest extends GridCommonAbstractTest {
//Check that nodes are alie
assertEquals("one", cache1.get(1));
- assertEquals("tow", cache2.get(2));
+ assertEquals("two", cache2.get(2));
String fullLog = strWr.toString();
http://git-wip-us.apache.org/repos/asf/ignite/blob/3762b0dd/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCrossCacheTxStoreSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCrossCacheTxStoreSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCrossCacheTxStoreSelfTest.java
index a7682a1..bf5ba61 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCrossCacheTxStoreSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCrossCacheTxStoreSelfTest.java
@@ -26,7 +26,6 @@ import javax.cache.Cache;
import javax.cache.configuration.Factory;
import javax.cache.integration.CacheLoaderException;
import javax.cache.integration.CacheWriterException;
-
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.cache.store.CacheStore;
http://git-wip-us.apache.org/repos/asf/ignite/blob/3762b0dd/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridCacheTestContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridCacheTestContext.java b/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridCacheTestContext.java
index 4f0d9a1..2096179 100644
--- a/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridCacheTestContext.java
+++ b/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridCacheTestContext.java
@@ -18,6 +18,7 @@
package org.apache.ignite.loadtests.hashmap;
import java.util.IdentityHashMap;
+import java.util.UUID;
import org.apache.ignite.cache.store.CacheStore;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
@@ -80,6 +81,7 @@ public class GridCacheTestContext<K, V> extends GridCacheContext<K, V> {
defaultCacheConfiguration(),
CacheType.USER,
AffinityTopologyVersion.ZERO,
+ UUID.randomUUID(),
true,
true,
null,
[2/8] ignite git commit: IGNITE-5190 - ArrayIndexOutOfBoundsException
in GridMergeIndexSorted
Posted by sb...@apache.org.
IGNITE-5190 - ArrayIndexOutOfBoundsException in GridMergeIndexSorted
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/3a9dba5f
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/3a9dba5f
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/3a9dba5f
Branch: refs/heads/ignite-5075-cacheStart
Commit: 3a9dba5f86ad34736ccd278ebf91044e18cf9a6b
Parents: f0051e1
Author: Sergi Vladykin <se...@gmail.com>
Authored: Thu May 11 16:01:38 2017 +0300
Committer: Sergi Vladykin <se...@gmail.com>
Committed: Thu May 11 16:01:38 2017 +0300
----------------------------------------------------------------------
.../query/h2/twostep/GridMergeIndexSorted.java | 3 +
.../query/IgniteSqlSplitterSelfTest.java | 68 ++++++++++++++++++++
2 files changed, 71 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/3a9dba5f/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndexSorted.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndexSorted.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndexSorted.java
index f2d9de4..54c8dd4 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndexSorted.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndexSorted.java
@@ -252,6 +252,9 @@ public final class GridMergeIndexSorted extends GridMergeIndex {
*
*/
private void goNext() {
+ if (off == streams.length)
+ return; // All streams are done.
+
if (streams[off].next())
bubbleUp(streams, off, streamCmp);
else
http://git-wip-us.apache.org/repos/asf/ignite/blob/3a9dba5f/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlSplitterSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlSplitterSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlSplitterSelfTest.java
index ad1c8b8..f98f41b 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlSplitterSelfTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlSplitterSelfTest.java
@@ -1609,6 +1609,37 @@ public class IgniteSqlSplitterSelfTest extends GridCommonAbstractTest {
}
}
+ /**
+ * @throws Exception If failed.
+ */
+ public void testJoinWithSubquery() throws Exception {
+ IgniteCache<Integer, Contract> c1 = ignite(0).createCache(
+ cacheConfig("Contract", true,
+ Integer.class, Contract.class));
+
+ IgniteCache<Integer, PromoContract> c2 = ignite(0).createCache(
+ cacheConfig("PromoContract", true,
+ Integer.class, PromoContract.class));
+
+ for (int i = 0; i < 100; i++) {
+ int coId = i % 10;
+ int cust = i / 10;
+ c1.put( i, new Contract(coId, cust));
+ }
+
+ for (int i = 0; i < 10; i++)
+ c2.put(i, new PromoContract((i % 5) + 1, i));
+
+ final List<List<?>> res = c2.query(new SqlFieldsQuery("SELECT CO.CO_ID \n" +
+ "FROM PromoContract PMC \n" +
+ "INNER JOIN \"Contract\".Contract CO ON PMC.CO_ID = 5 \n" +
+ "AND PMC.CO_ID = CO.CO_ID \n" +
+ "INNER JOIN (SELECT CO_ID FROM PromoContract EBP WHERE EBP.CO_ID = 5 LIMIT 1) VPMC \n" +
+ "ON PMC.CO_ID = VPMC.CO_ID ")).getAll();
+
+ assertFalse(res.isEmpty());
+ }
+
/** @throws Exception if failed. */
public void testDistributedAggregates() throws Exception {
final String cacheName = "ints";
@@ -2128,4 +2159,41 @@ public class IgniteSqlSplitterSelfTest extends GridCommonAbstractTest {
@QuerySqlField
private int goodId;
}
+
+ /** */
+ private static class Contract implements Serializable {
+ /** */
+ @QuerySqlField(index = true)
+ private final int CO_ID;
+
+ /** */
+ @QuerySqlField(index = true)
+ private final int CUSTOMER_ID;
+
+ /** */
+ public Contract(final int CO_ID, final int CUSTOMER_ID) {
+ this.CO_ID = CO_ID;
+ this.CUSTOMER_ID = CUSTOMER_ID;
+ }
+
+ }
+
+ /** */
+ public class PromoContract implements Serializable {
+ /** */
+ @QuerySqlField(index = true, orderedGroups = {
+ @QuerySqlField.Group(name = "myIdx", order = 1)})
+ private final int CO_ID;
+
+ /** */
+ @QuerySqlField(index = true, orderedGroups = {
+ @QuerySqlField.Group(name = "myIdx", order = 0)})
+ private final int OFFER_ID;
+
+ /** */
+ public PromoContract(final int co_Id, final int offer_Id) {
+ this.CO_ID = co_Id;
+ this.OFFER_ID = offer_Id;
+ }
+ }
}
[3/8] ignite git commit: Removed condition, simplified logic
Posted by sb...@apache.org.
Removed condition, simplified logic
http://ci.ignite.apache.org/viewLog.html?buildId=606030&tab=buildResultsDiv&buildTypeId=IgniteTests_IgniteDataStrucutures
http://ci.ignite.apache.org/viewLog.html?buildId=606029&tab=buildResultsDiv&buildTypeId=IgniteTests_IgniteBinaryObjectsDataStrucutures
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/7ea5830a
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/7ea5830a
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/7ea5830a
Branch: refs/heads/ignite-5075-cacheStart
Commit: 7ea5830a5f28bb52db3efa6955f505e731e87f90
Parents: 3a9dba5
Author: Yakov Zhdanov <yz...@gridgain.com>
Authored: Fri May 12 01:06:05 2017 +0300
Committer: Yakov Zhdanov <yz...@gridgain.com>
Committed: Fri May 12 01:06:05 2017 +0300
----------------------------------------------------------------------
.../GridCacheAtomicSequenceImpl.java | 101 ++++---------------
1 file changed, 21 insertions(+), 80 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/7ea5830a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicSequenceImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicSequenceImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicSequenceImpl.java
index d14bb47..5a87e4a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicSequenceImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicSequenceImpl.java
@@ -24,8 +24,6 @@ import java.io.ObjectInput;
import java.io.ObjectOutput;
import java.io.ObjectStreamException;
import java.util.concurrent.Callable;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.ignite.IgniteCheckedException;
@@ -44,8 +42,6 @@ import org.apache.ignite.internal.processors.cluster.IgniteChangeGlobalStateSupp
import org.apache.ignite.lang.IgniteBiTuple;
import org.jetbrains.annotations.Nullable;
-import static java.util.concurrent.TimeUnit.MILLISECONDS;
-import static org.apache.ignite.internal.util.typedef.internal.CU.retryTopologySafe;
import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ;
@@ -87,7 +83,7 @@ public final class GridCacheAtomicSequenceImpl implements GridCacheAtomicSequenc
/** Local value of sequence. */
@GridToStringInclude(sensitive = true)
- private long locVal;
+ private volatile long locVal;
/** Upper bound of local counter. */
private long upBound;
@@ -98,18 +94,12 @@ public final class GridCacheAtomicSequenceImpl implements GridCacheAtomicSequenc
/** Synchronization lock. */
private final Lock lock = new ReentrantLock();
- /** Await condition. */
- private Condition cond = lock.newCondition();
-
/** Callable for execution {@link #incrementAndGet} operation in async and sync mode. */
private final Callable<Long> incAndGetCall = internalUpdate(1, true);
/** Callable for execution {@link #getAndIncrement} operation in async and sync mode. */
private final Callable<Long> getAndIncCall = internalUpdate(1, false);
- /** Add and get cache call guard. */
- private final AtomicBoolean updateGuard = new AtomicBoolean();
-
/**
* Empty constructor required by {@link Externalizable}.
*/
@@ -161,14 +151,7 @@ public final class GridCacheAtomicSequenceImpl implements GridCacheAtomicSequenc
@Override public long get() {
checkRemoved();
- lock.lock();
-
- try {
- return locVal;
- }
- finally {
- lock.unlock();
- }
+ return locVal;
}
/** {@inheritDoc} */
@@ -235,70 +218,30 @@ public final class GridCacheAtomicSequenceImpl implements GridCacheAtomicSequenc
try {
// If reserved range isn't exhausted.
- if (locVal + l <= upBound) {
- long curVal = locVal;
+ long locVal0 = locVal;
- locVal += l;
+ if (locVal0 + l <= upBound) {
+ locVal = locVal0 + l;
- return updated ? locVal : curVal;
+ return updated ? locVal0 + l : locVal0;
}
- }
- finally {
- lock.unlock();
- }
-
- if (updateCall == null)
- updateCall = internalUpdate(l, updated);
-
- while (true) {
- if (updateGuard.compareAndSet(false, true)) {
- try {
- try {
- return retryTopologySafe(updateCall);
- }
- catch (IgniteCheckedException | IgniteException | IllegalStateException e) {
- throw e;
- }
- catch (Exception e) {
- throw new IgniteCheckedException(e);
- }
- }
- finally {
- lock.lock();
- try {
- updateGuard.set(false);
+ if (updateCall == null)
+ updateCall = internalUpdate(l, updated);
- cond.signalAll();
- }
- finally {
- lock.unlock();
- }
- }
+ try {
+ return updateCall.call();
}
- else {
- lock.lock();
-
- try {
- while (locVal >= upBound && updateGuard.get())
- U.await(cond, 500, MILLISECONDS);
-
- checkRemoved();
-
- // If reserved range isn't exhausted.
- if (locVal + l <= upBound) {
- long curVal = locVal;
-
- locVal += l;
-
- return updated ? locVal : curVal;
- }
- }
- finally {
- lock.unlock();
- }
+ catch (IgniteCheckedException | IgniteException | IllegalStateException e) {
+ throw e;
+ }
+ catch (Exception e) {
+ throw new IgniteCheckedException(e);
}
}
+ finally {
+ lock.unlock();
+ }
}
/** Get local batch size for this sequences.
@@ -422,12 +365,10 @@ public final class GridCacheAtomicSequenceImpl implements GridCacheAtomicSequenc
curLocVal = locVal;
// If local range was already reserved in another thread.
- if (locVal + l <= upBound) {
- long retVal = locVal;
-
- locVal += l;
+ if (curLocVal + l <= upBound) {
+ locVal = curLocVal + l;
- return updated ? locVal : retVal;
+ return updated ? curLocVal + l : curLocVal;
}
long curGlobalVal = seq.get();
[6/8] ignite git commit: ignite-5075
Posted by sb...@apache.org.
ignite-5075
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/55cc4b56
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/55cc4b56
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/55cc4b56
Branch: refs/heads/ignite-5075-cacheStart
Commit: 55cc4b56289bb67a75ac1df07f4f0c7c22cbb184
Parents: e0e5878
Author: sboikov <sb...@gridgain.com>
Authored: Fri May 12 09:21:01 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Fri May 12 09:21:01 2017 +0300
----------------------------------------------------------------------
.../dht/preloader/GridDhtPartitionsExchangeFuture.java | 3 +--
.../ignite/internal/GridNodeMetricsLogSelfTest.java | 13 ++++---------
2 files changed, 5 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/55cc4b56/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index 7cec781..477d0ce 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -59,7 +59,6 @@ import org.apache.ignite.internal.processors.cache.CacheInvalidStateException;
import org.apache.ignite.internal.processors.cache.ClusterState;
import org.apache.ignite.internal.processors.cache.DynamicCacheChangeBatch;
import org.apache.ignite.internal.processors.cache.CachePartitionExchangeWorkerTask;
-import org.apache.ignite.internal.processors.cache.DynamicCacheChangeRequest;
import org.apache.ignite.internal.processors.cache.DynamicCacheDescriptor;
import org.apache.ignite.internal.processors.cache.ExchangeActions;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
@@ -319,7 +318,7 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
* @param exchActions Exchange actions.
*/
public void exchangeActions(ExchangeActions exchActions) {
- assert exchActions == null || !exchActions.empty();
+ assert exchActions == null || !exchActions.empty() : exchActions;
assert evtLatch != null && evtLatch.getCount() == 1L : this;
this.exchActions = exchActions;
http://git-wip-us.apache.org/repos/asf/ignite/blob/55cc4b56/modules/core/src/test/java/org/apache/ignite/internal/GridNodeMetricsLogSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/GridNodeMetricsLogSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/GridNodeMetricsLogSelfTest.java
index 13fae24..ea298e6 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/GridNodeMetricsLogSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/GridNodeMetricsLogSelfTest.java
@@ -35,12 +35,6 @@ import org.apache.log4j.WriterAppender;
@SuppressWarnings({"ProhibitedExceptionDeclared"})
@GridCommonTest(group = "Kernal")
public class GridNodeMetricsLogSelfTest extends GridCommonAbstractTest {
- /** */
-
- public GridNodeMetricsLogSelfTest() {
- super(false);
- }
-
/** {@inheritDoc} */
@SuppressWarnings({"unchecked"})
@Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
@@ -51,6 +45,7 @@ public class GridNodeMetricsLogSelfTest extends GridCommonAbstractTest {
return cfg;
}
+ /** {@inheritDoc} */
@Override protected void afterTest() throws Exception {
stopAllGrids();
}
@@ -80,11 +75,11 @@ public class GridNodeMetricsLogSelfTest extends GridCommonAbstractTest {
cache2.put(2, "two");
- Thread.sleep(10000);
+ Thread.sleep(10_000);
//Check that nodes are alie
- assert cache1.get(1).equals("one");
- assert cache2.get(2).equals("two");
+ assertEquals("one", cache1.get(1));
+ assertEquals("tow", cache2.get(2));
String fullLog = strWr.toString();
[5/8] ignite git commit: ignite-4932 When possible for cache 'get'
read directly from offheap without entry creation.
Posted by sb...@apache.org.
ignite-4932 When possible for cache 'get' read directly from offheap without entry creation.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/01671827
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/01671827
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/01671827
Branch: refs/heads/ignite-5075-cacheStart
Commit: 01671827411ed6043e6bfb80514e3ff57fb40b18
Parents: 7ea5830
Author: sboikov <sb...@gridgain.com>
Authored: Fri May 12 09:06:48 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Fri May 12 09:06:48 2017 +0300
----------------------------------------------------------------------
.../processors/cache/GridCacheAdapter.java | 129 ++++--
.../cache/GridCacheConcurrentMap.java | 2 -
.../cache/GridCacheConcurrentMapImpl.java | 5 +-
.../processors/cache/GridCacheContext.java | 12 +
.../processors/cache/GridCacheEventManager.java | 32 ++
.../processors/cache/GridCacheMapEntry.java | 14 +-
.../cache/GridCacheMapEntryFactory.java | 6 +-
.../processors/cache/GridNoStorageCacheMap.java | 8 +-
.../cache/IgniteCacheOffheapManager.java | 7 +
.../cache/IgniteCacheOffheapManagerImpl.java | 17 +
.../distributed/GridDistributedCacheEntry.java | 8 +-
.../dht/GridCachePartitionedConcurrentMap.java | 10 +-
.../distributed/dht/GridDhtCacheAdapter.java | 8 +-
.../distributed/dht/GridDhtCacheEntry.java | 8 +-
.../distributed/dht/GridDhtLocalPartition.java | 4 +-
.../dht/GridPartitionedGetFuture.java | 158 +++++---
.../dht/GridPartitionedSingleGetFuture.java | 141 ++++---
.../dht/atomic/GridDhtAtomicCache.java | 226 ++++++-----
.../dht/atomic/GridDhtAtomicCacheEntry.java | 11 +-
.../dht/colocated/GridDhtColocatedCache.java | 222 ++++++-----
.../colocated/GridDhtColocatedCacheEntry.java | 11 +-
.../colocated/GridDhtDetachedCacheEntry.java | 10 +-
.../distributed/near/GridNearCacheAdapter.java | 6 +-
.../distributed/near/GridNearCacheEntry.java | 8 +-
.../cache/distributed/near/GridNearTxLocal.java | 7 +-
.../processors/cache/local/GridLocalCache.java | 6 +-
.../cache/local/GridLocalCacheEntry.java | 8 +-
.../local/atomic/GridLocalAtomicCache.java | 188 +++++----
.../ignite/spi/discovery/tcp/ServerImpl.java | 2 +
.../cache/IgniteCacheNoSyncForGetTest.java | 395 +++++++++++++++++++
.../IgniteCacheExpiryPolicyAbstractTest.java | 2 +-
.../loadtests/hashmap/GridHashMapLoadTest.java | 4 +-
.../testsuites/IgniteCacheTestSuite2.java | 3 +
.../cache/IgniteGetFromComputeBenchmark.java | 167 ++++++++
34 files changed, 1339 insertions(+), 506 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/01671827/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
index 0b1ab74..694f4b0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
@@ -944,7 +944,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
* @return Entry (never {@code null}).
*/
public GridCacheEntryEx entryEx(KeyCacheObject key, AffinityTopologyVersion topVer) {
- GridCacheEntryEx e = map.putEntryIfObsoleteOrAbsent(topVer, key, null, true, false);
+ GridCacheEntryEx e = map.putEntryIfObsoleteOrAbsent(topVer, key, true, false);
assert e != null;
@@ -966,7 +966,6 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
cur = map.putEntryIfObsoleteOrAbsent(
topVer,
key,
- null,
create, touch);
}
@@ -1965,58 +1964,104 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
final boolean storeEnabled = !skipVals && readThrough && ctx.readThrough();
+ boolean readNoEntry = ctx.readNoEntry(expiry, readerArgs != null);
+
for (KeyCacheObject key : keys) {
while (true) {
- GridCacheEntryEx entry = entryEx(key);
-
- if (entry == null) {
- if (!skipVals && ctx.config().isStatisticsEnabled())
- ctx.cache().metrics0().onRead(false);
-
- break;
- }
-
try {
- EntryGetResult res;
+ EntryGetResult res = null;
boolean evt = !skipVals;
boolean updateMetrics = !skipVals;
- if (storeEnabled) {
- res = entry.innerGetAndReserveForLoad(updateMetrics,
- evt,
- subjId,
- taskName,
- expiry,
- !deserializeBinary,
- readerArgs);
+ GridCacheEntryEx entry = null;
- assert res != null;
+ boolean skipEntry = readNoEntry;
- if (res.value() == null) {
- if (misses == null)
- misses = new HashMap<>();
+ if (readNoEntry) {
+ CacheDataRow row = ctx.offheap().read(key);
- misses.put(key, res);
+ if (row != null) {
+ long expireTime = row.expireTime();
+
+ if (expireTime != 0) {
+ if (expireTime > U.currentTimeMillis()) {
+ res = new EntryGetWithTtlResult(row.value(),
+ row.version(),
+ false,
+ expireTime,
+ 0);
+ }
+ else
+ skipEntry = false;
+ }
+ else
+ res = new EntryGetResult(row.value(), row.version(), false);
+ }
+
+ if (res != null) {
+ if (evt) {
+ ctx.events().readEvent(key,
+ null,
+ row.value(),
+ subjId,
+ taskName,
+ !deserializeBinary);
+ }
- res = null;
+ if (updateMetrics && ctx.cache().configuration().isStatisticsEnabled())
+ ctx.cache().metrics0().onRead(true);
}
+ else if (storeEnabled)
+ skipEntry = false;
}
- else {
- res = entry.innerGetVersioned(
- null,
- null,
- updateMetrics,
- evt,
- subjId,
- null,
- taskName,
- expiry,
- !deserializeBinary,
- readerArgs);
-
- if (res == null)
- ctx.evicts().touch(entry, topVer);
+
+ if (!skipEntry) {
+ entry = entryEx(key);
+
+ if (entry == null) {
+ if (!skipVals && ctx.config().isStatisticsEnabled())
+ ctx.cache().metrics0().onRead(false);
+
+ break;
+ }
+
+ if (storeEnabled) {
+ res = entry.innerGetAndReserveForLoad(updateMetrics,
+ evt,
+ subjId,
+ taskName,
+ expiry,
+ !deserializeBinary,
+ readerArgs);
+
+ assert res != null;
+
+ if (res.value() == null) {
+ if (misses == null)
+ misses = new HashMap<>();
+
+ misses.put(key, res);
+
+ res = null;
+ }
+ }
+ else {
+ res = entry.innerGetVersioned(
+ null,
+ null,
+ updateMetrics,
+ evt,
+ subjId,
+ null,
+ taskName,
+ expiry,
+ !deserializeBinary,
+ readerArgs);
+
+ if (res == null)
+ ctx.evicts().touch(entry, topVer);
+ }
}
if (res != null) {
@@ -2029,7 +2074,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
true,
needVer);
- if (tx == null || (!tx.implicit() && tx.isolation() == READ_COMMITTED))
+ if (entry != null && (tx == null || (!tx.implicit() && tx.isolation() == READ_COMMITTED)))
ctx.evicts().touch(entry, topVer);
if (keysSize == 1)
http://git-wip-us.apache.org/repos/asf/ignite/blob/01671827/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheConcurrentMap.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheConcurrentMap.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheConcurrentMap.java
index 9378f74..0fe5c9f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheConcurrentMap.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheConcurrentMap.java
@@ -38,7 +38,6 @@ public interface GridCacheConcurrentMap {
/**
* @param topVer Topology version.
* @param key Key.
- * @param val Value.
* @param create Create flag.
* @return Existing or new GridCacheMapEntry. Will return {@code null} if entry is obsolete or absent and create
* flag is set to {@code false}. Will also return {@code null} if create flag is set to {@code true}, but entry
@@ -47,7 +46,6 @@ public interface GridCacheConcurrentMap {
@Nullable public GridCacheMapEntry putEntryIfObsoleteOrAbsent(
AffinityTopologyVersion topVer,
KeyCacheObject key,
- @Nullable CacheObject val,
boolean create,
boolean touch);
http://git-wip-us.apache.org/repos/asf/ignite/blob/01671827/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheConcurrentMapImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheConcurrentMapImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheConcurrentMapImpl.java
index 76d961a..2c262df 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheConcurrentMapImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheConcurrentMapImpl.java
@@ -107,7 +107,6 @@ public abstract class GridCacheConcurrentMapImpl implements GridCacheConcurrentM
/** {@inheritDoc} */
@Nullable @Override public GridCacheMapEntry putEntryIfObsoleteOrAbsent(final AffinityTopologyVersion topVer,
KeyCacheObject key,
- @Nullable final CacheObject val,
final boolean create,
final boolean touch) {
GridCacheMapEntry cur = null;
@@ -135,7 +134,7 @@ public abstract class GridCacheConcurrentMapImpl implements GridCacheConcurrentM
reserved = true;
}
- created0 = factory.create(ctx, topVer, key, key.hashCode(), val);
+ created0 = factory.create(ctx, topVer, key);
}
cur = created = created0;
@@ -158,7 +157,7 @@ public abstract class GridCacheConcurrentMapImpl implements GridCacheConcurrentM
reserved = true;
}
- created0 = factory.create(ctx, topVer, key, key.hashCode(), val);
+ created0 = factory.create(ctx, topVer, key);
}
cur = created = created0;
http://git-wip-us.apache.org/repos/asf/ignite/blob/01671827/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
index 8d562c5..a0489fc 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
@@ -2042,6 +2042,18 @@ public class GridCacheContext<K, V> implements Externalizable {
}
/**
+ * Checks if it is possible to directly read data memory without entry creation (this
+ * is optimization to avoid unnecessary blocking synchronization on cache entry).
+ *
+ * @param expiryPlc Optional expiry policy for read operation.
+ * @param readers {@code True} if need update near cache readers.
+ * @return {@code True} if it is possible to directly read offheap instead of using {@link GridCacheEntryEx#innerGet}.
+ */
+ public boolean readNoEntry(@Nullable IgniteCacheExpiryPolicy expiryPlc, boolean readers) {
+ return !config().isOnheapCacheEnabled() && !readers && expiryPlc == null;
+ }
+
+ /**
* @return {@code True} if fast eviction is allowed.
*/
public boolean allowFastEviction() {
http://git-wip-us.apache.org/repos/asf/ignite/blob/01671827/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEventManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEventManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEventManager.java
index be5b539..687b132 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEventManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEventManager.java
@@ -31,6 +31,7 @@ import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteUuid;
import org.jetbrains.annotations.Nullable;
+import static org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_READ;
import static org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_PART_UNLOADED;
import static org.apache.ignite.events.EventType.EVT_CACHE_STARTED;
import static org.apache.ignite.events.EventType.EVT_CACHE_STOPPED;
@@ -62,6 +63,37 @@ public class GridCacheEventManager extends GridCacheManagerAdapter {
}
/**
+ * @param key Key for event.
+ * @param tx Possible surrounding transaction.
+ * @param val Read value.
+ * @param subjId Subject ID.
+ * @param taskName Task name.
+ * @param keepBinary Keep binary flag.
+ */
+ public void readEvent(KeyCacheObject key,
+ @Nullable IgniteInternalTx tx,
+ @Nullable CacheObject val,
+ @Nullable UUID subjId,
+ @Nullable String taskName,
+ boolean keepBinary) {
+ if (isRecordable(EVT_CACHE_OBJECT_READ)) {
+ addEvent(cctx.affinity().partition(key),
+ key,
+ tx,
+ null,
+ EVT_CACHE_OBJECT_READ,
+ val,
+ val != null,
+ val,
+ val != null,
+ subjId,
+ null,
+ taskName,
+ keepBinary);
+ }
+ }
+
+ /**
* @param part Partition.
* @param key Key for the event.
* @param tx Possible surrounding transaction.
http://git-wip-us.apache.org/repos/asf/ignite/blob/01671827/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
index 21c58fa..edf90d0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
@@ -169,14 +169,10 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
/**
* @param cctx Cache context.
* @param key Cache key.
- * @param hash Key hash value.
- * @param val Entry value.
*/
protected GridCacheMapEntry(
GridCacheContext<?, ?> cctx,
- KeyCacheObject key,
- int hash,
- CacheObject val
+ KeyCacheObject key
) {
if (log == null)
log = U.logger(cctx.kernalContext(), logRef, GridCacheMapEntry.class);
@@ -186,15 +182,9 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
assert key != null;
this.key = key;
- this.hash = hash;
+ this.hash = key.hashCode();
this.cctx = cctx;
- val = cctx.kernalContext().cacheObjects().prepareForCache(val, cctx);
-
- synchronized (this) {
- value(val);
- }
-
ver = cctx.versions().next();
startVer = ver.order();
http://git-wip-us.apache.org/repos/asf/ignite/blob/01671827/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntryFactory.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntryFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntryFactory.java
index 4ee9385..d3e3921 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntryFactory.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntryFactory.java
@@ -27,15 +27,11 @@ public interface GridCacheMapEntryFactory {
* @param ctx Cache registry.
* @param topVer Topology version.
* @param key Cache key.
- * @param hash Key hash value.
- * @param val Entry value.
* @return New cache entry.
*/
public GridCacheMapEntry create(
GridCacheContext ctx,
AffinityTopologyVersion topVer,
- KeyCacheObject key,
- int hash,
- CacheObject val
+ KeyCacheObject key
);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/01671827/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridNoStorageCacheMap.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridNoStorageCacheMap.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridNoStorageCacheMap.java
index 00827ee..14a8482 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridNoStorageCacheMap.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridNoStorageCacheMap.java
@@ -45,10 +45,12 @@ public class GridNoStorageCacheMap implements GridCacheConcurrentMap {
}
/** {@inheritDoc} */
- @Override public GridCacheMapEntry putEntryIfObsoleteOrAbsent(AffinityTopologyVersion topVer, KeyCacheObject key,
- @Nullable CacheObject val, boolean create, boolean touch) {
+ @Override public GridCacheMapEntry putEntryIfObsoleteOrAbsent(AffinityTopologyVersion topVer,
+ KeyCacheObject key,
+ boolean create,
+ boolean touch) {
if (create)
- return new GridDhtCacheEntry(ctx, topVer, key, key.hashCode(), val);
+ return new GridDhtCacheEntry(ctx, topVer, key);
else
return null;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/01671827/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
index 9eb5368..b476aeb 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
@@ -71,6 +71,13 @@ public interface IgniteCacheOffheapManager extends GridCacheManager {
@Nullable public CacheDataRow read(GridCacheMapEntry entry) throws IgniteCheckedException;
/**
+ * @param key Key.
+ * @return Cached row, if available, null otherwise.
+ * @throws IgniteCheckedException If failed.
+ */
+ @Nullable public CacheDataRow read(KeyCacheObject key) throws IgniteCheckedException;
+
+ /**
* @param p Partition.
* @return Data store.
* @throws IgniteCheckedException If failed.
http://git-wip-us.apache.org/repos/asf/ignite/blob/01671827/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
index 650f65e..099840a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
@@ -362,6 +362,23 @@ public class IgniteCacheOffheapManagerImpl extends GridCacheManagerAdapter imple
}
/** {@inheritDoc} */
+ @Nullable @Override public CacheDataRow read(KeyCacheObject key) throws IgniteCheckedException {
+ CacheDataRow row;
+
+ if (cctx.isLocal())
+ row = locCacheDataStore.find(key);
+ else {
+ GridDhtLocalPartition part = cctx.topology().localPartition(cctx.affinity().partition(key), null, false);
+
+ row = part != null ? dataStore(part).find(key) : null;
+ }
+
+ assert row == null || row.value() != null : row;
+
+ return row;
+ }
+
+ /** {@inheritDoc} */
@Override public boolean containsKey(GridCacheMapEntry entry) {
try {
return read(entry) != null;
http://git-wip-us.apache.org/repos/asf/ignite/blob/01671827/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheEntry.java
index f518934..e7675b7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheEntry.java
@@ -49,16 +49,12 @@ public class GridDistributedCacheEntry extends GridCacheMapEntry {
/**
* @param ctx Cache context.
* @param key Cache key.
- * @param hash Key hash value.
- * @param val Entry value.
*/
public GridDistributedCacheEntry(
GridCacheContext ctx,
- KeyCacheObject key,
- int hash,
- CacheObject val
+ KeyCacheObject key
) {
- super(ctx, key, hash, val);
+ super(ctx, key);
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/01671827/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCachePartitionedConcurrentMap.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCachePartitionedConcurrentMap.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCachePartitionedConcurrentMap.java
index 357bf89..f021b65 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCachePartitionedConcurrentMap.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCachePartitionedConcurrentMap.java
@@ -79,20 +79,22 @@ public class GridCachePartitionedConcurrentMap implements GridCacheConcurrentMap
}
/** {@inheritDoc} */
- @Override public GridCacheMapEntry putEntryIfObsoleteOrAbsent(AffinityTopologyVersion topVer, KeyCacheObject key,
- @Nullable CacheObject val, boolean create, boolean touch) {
+ @Override public GridCacheMapEntry putEntryIfObsoleteOrAbsent(AffinityTopologyVersion topVer,
+ KeyCacheObject key,
+ boolean create,
+ boolean touch) {
while (true) {
GridDhtLocalPartition part = localPartition(key, topVer, create);
if (part == null)
return null;
- GridCacheMapEntry res = part.putEntryIfObsoleteOrAbsent(topVer, key, val, create, touch);
+ GridCacheMapEntry res = part.putEntryIfObsoleteOrAbsent(topVer, key, create, touch);
if (res != null || !create)
return res;
- // Otherwise parttion was concurrently evicted and should be re-created on next iteration.
+ // Otherwise partition was concurrently evicted and should be re-created on next iteration.
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/01671827/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
index 7e6ae81..121c734 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
@@ -246,11 +246,9 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap
@Override public GridCacheMapEntry create(
GridCacheContext ctx,
AffinityTopologyVersion topVer,
- KeyCacheObject key,
- int hash,
- CacheObject val
+ KeyCacheObject key
) {
- return new GridDhtCacheEntry(ctx, topVer, key, hash, val);
+ return new GridDhtCacheEntry(ctx, topVer, key);
}
};
}
@@ -428,7 +426,7 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap
* @return Cache entry.
*/
protected GridDistributedCacheEntry createEntry(KeyCacheObject key) {
- return new GridDhtDetachedCacheEntry(ctx, key, key.hashCode(), null, null, 0);
+ return new GridDhtDetachedCacheEntry(ctx, key);
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/01671827/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java
index 4c22090..be7805f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java
@@ -76,17 +76,13 @@ public class GridDhtCacheEntry extends GridDistributedCacheEntry {
* @param ctx Cache context.
* @param topVer Topology version at the time of creation (if negative, then latest topology is assumed).
* @param key Cache key.
- * @param hash Key hash value.
- * @param val Entry value.
*/
public GridDhtCacheEntry(
GridCacheContext ctx,
AffinityTopologyVersion topVer,
- KeyCacheObject key,
- int hash,
- CacheObject val
+ KeyCacheObject key
) {
- super(ctx, key, hash, val);
+ super(ctx, key);
// Record this entry with partition.
int p = cctx.affinity().partition(key);
http://git-wip-us.apache.org/repos/asf/ignite/blob/01671827/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
index 5425954..4208a98 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
@@ -900,7 +900,9 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements
CacheDataRow row = it0.next();
GridCacheMapEntry cached = putEntryIfObsoleteOrAbsent(cctx.affinity().affinityTopologyVersion(),
- row.key(), null, true, false);
+ row.key(),
+ true,
+ false);
if (cached instanceof GridDhtCacheEntry && ((GridDhtCacheEntry)cached).clearInternal(clearVer, extras)) {
if (rec) {
http://git-wip-us.apache.org/repos/asf/ignite/blob/01671827/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
index 4dc4eb4..362432c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
@@ -41,6 +41,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedExceptio
import org.apache.ignite.internal.processors.cache.GridCacheMessage;
import org.apache.ignite.internal.processors.cache.IgniteCacheExpiryPolicy;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
+import org.apache.ignite.internal.processors.cache.database.CacheDataRow;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearGetRequest;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearGetResponse;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
@@ -445,75 +446,109 @@ public class GridPartitionedGetFuture<K, V> extends CacheDistributedGetFutureAda
GridDhtCacheAdapter<K, V> cache = cache();
+ boolean readNoEntry = cctx.readNoEntry(expiryPlc, false);
+ boolean evt = !skipVals;
+
while (true) {
try {
- GridCacheEntryEx entry = cache.entryEx(key);
-
- // If our DHT cache do has value, then we peek it.
- if (entry != null) {
- boolean isNew = entry.isNewLocked();
-
- EntryGetResult getRes = null;
- CacheObject v = null;
- GridCacheVersion ver = null;
-
- if (needVer) {
- getRes = entry.innerGetVersioned(
- null,
- null,
- /**update-metrics*/false,
- /*event*/!skipVals,
- subjId,
- null,
- taskName,
- expiryPlc,
- !deserializeBinary,
- null);
-
- if (getRes != null) {
- v = getRes.value();
- ver = getRes.version();
+ boolean skipEntry = readNoEntry;
+
+ EntryGetResult getRes = null;
+ CacheObject v = null;
+ GridCacheVersion ver = null;
+
+ if (readNoEntry) {
+ CacheDataRow row = cctx.offheap().read(key);
+
+ if (row != null) {
+ long expireTime = row.expireTime();
+
+ if (expireTime == 0 || expireTime > U.currentTimeMillis()) {
+ v = row.value();
+
+ if (needVer)
+ ver = row.version();
+
+ if (evt) {
+ cctx.events().readEvent(key,
+ null,
+ row.value(),
+ subjId,
+ taskName,
+ !deserializeBinary);
+ }
}
+ else
+ skipEntry = false;
}
- else {
- v = entry.innerGet(
- null,
- null,
- /*read-through*/false,
- /**update-metrics*/false,
- /*event*/!skipVals,
- subjId,
- null,
- taskName,
- expiryPlc,
- !deserializeBinary);
- }
+ }
- cache.context().evicts().touch(entry, topVer);
+ if (!skipEntry) {
+ GridCacheEntryEx entry = cache.entryEx(key);
+
+ // If our DHT cache do has value, then we peek it.
+ if (entry != null) {
+ boolean isNew = entry.isNewLocked();
+
+ if (needVer) {
+ getRes = entry.innerGetVersioned(
+ null,
+ null,
+ /*update-metrics*/false,
+ /*event*/evt,
+ subjId,
+ null,
+ taskName,
+ expiryPlc,
+ !deserializeBinary,
+ null);
+
+ if (getRes != null) {
+ v = getRes.value();
+ ver = getRes.version();
+ }
+ }
+ else {
+ v = entry.innerGet(
+ null,
+ null,
+ /*read-through*/false,
+ /*update-metrics*/false,
+ /*event*/evt,
+ subjId,
+ null,
+ taskName,
+ expiryPlc,
+ !deserializeBinary);
+ }
- // Entry was not in memory or in swap, so we remove it from cache.
- if (v == null) {
- if (isNew && entry.markObsoleteIfEmpty(ver))
- cache.removeEntry(entry);
- }
- else {
- cctx.addResult(locVals,
- key,
- v,
- skipVals,
- keepCacheObjects,
- deserializeBinary,
- true,
- getRes,
- ver,
- 0,
- 0,
- needVer);
-
- return true;
+ cache.context().evicts().touch(entry, topVer);
+
+ // Entry was not in memory or in swap, so we remove it from cache.
+ if (v == null) {
+ if (isNew && entry.markObsoleteIfEmpty(ver))
+ cache.removeEntry(entry);
+ }
}
}
+ if (v != null) {
+ cctx.addResult(locVals,
+ key,
+ v,
+ skipVals,
+ keepCacheObjects,
+ deserializeBinary,
+ true,
+ getRes,
+ ver,
+ 0,
+ 0,
+ needVer);
+
+ return true;
+ }
+
boolean topStable = cctx.isReplicated() || topVer.equals(cctx.topology().topologyVersion());
// Entry not found, do not continue search if topology did not change and there is no store.
@@ -604,9 +639,6 @@ public class GridPartitionedGetFuture<K, V> extends CacheDistributedGetFutureAda
*/
private class MiniFuture extends GridFutureAdapter<Map<K, V>> {
/** */
- private static final long serialVersionUID = 0L;
-
- /** */
private final IgniteUuid futId = IgniteUuid.randomUuid();
/** Node ID. */
http://git-wip-us.apache.org/repos/asf/ignite/blob/01671827/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
index dbf1fe1..63ed9a8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
@@ -41,6 +41,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheFutureAdapter;
import org.apache.ignite.internal.processors.cache.GridCacheMessage;
import org.apache.ignite.internal.processors.cache.IgniteCacheExpiryPolicy;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
+import org.apache.ignite.internal.processors.cache.database.CacheDataRow;
import org.apache.ignite.internal.processors.cache.distributed.near.CacheVersionedValue;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearGetResponse;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearSingleGetRequest;
@@ -288,7 +289,7 @@ public class GridPartitionedSingleGetFuture extends GridCacheFutureAdapter<Objec
expiryPlc != null ? expiryPlc.forCreate() : -1L,
expiryPlc != null ? expiryPlc.forAccess() : -1L,
skipVals,
- /**add reader*/false,
+ /*add reader*/false,
needVer,
cctx.deploymentEnabled(),
recovery);
@@ -347,67 +348,101 @@ public class GridPartitionedSingleGetFuture extends GridCacheFutureAdapter<Objec
GridDhtCacheAdapter colocated = cctx.dht();
+ boolean readNoEntry = cctx.readNoEntry(expiryPlc, false);
+ boolean evt = !skipVals;
+
while (true) {
try {
- GridCacheEntryEx entry = colocated.entryEx(key);
-
- // If our DHT cache do has value, then we peek it.
- if (entry != null) {
- boolean isNew = entry.isNewLocked();
-
- CacheObject v = null;
- GridCacheVersion ver = null;
-
- if (needVer) {
- EntryGetResult res = entry.innerGetVersioned(
- null,
- null,
- /**update-metrics*/false,
- /*event*/!skipVals,
- subjId,
- null,
- taskName,
- expiryPlc,
- true,
- null);
-
- if (res != null) {
- v = res.value();
- ver = res.version();
+ CacheObject v = null;
+ GridCacheVersion ver = null;
+
+ boolean skipEntry = readNoEntry;
+
+ if (readNoEntry) {
+ CacheDataRow row = cctx.offheap().read(key);
+
+ if (row != null) {
+ long expireTime = row.expireTime();
+
+ if (expireTime == 0 || expireTime > U.currentTimeMillis()) {
+ v = row.value();
+
+ if (needVer)
+ ver = row.version();
+
+ if (evt) {
+ cctx.events().readEvent(key,
+ null,
+ row.value(),
+ subjId,
+ taskName,
+ !deserializeBinary);
+ }
}
+ else
+ skipEntry = false;
}
- else {
- v = entry.innerGet(
- null,
- null,
- /*read-through*/false,
- /**update-metrics*/false,
- /*event*/!skipVals,
- subjId,
- null,
- taskName,
- expiryPlc,
- true);
- }
+ }
- colocated.context().evicts().touch(entry, topVer);
+ if (!skipEntry) {
+ GridCacheEntryEx entry = colocated.entryEx(key);
+
+ // If our DHT cache do has value, then we peek it.
+ if (entry != null) {
+ boolean isNew = entry.isNewLocked();
+
+ if (needVer) {
+ EntryGetResult res = entry.innerGetVersioned(
+ null,
+ null,
+ /*update-metrics*/false,
+ /*event*/evt,
+ subjId,
+ null,
+ taskName,
+ expiryPlc,
+ true,
+ null);
+
+ if (res != null) {
+ v = res.value();
+ ver = res.version();
+ }
+ }
+ else {
+ v = entry.innerGet(
+ null,
+ null,
+ /*read-through*/false,
+ /*update-metrics*/false,
+ /*event*/evt,
+ subjId,
+ null,
+ taskName,
+ expiryPlc,
+ true);
+ }
+
+ colocated.context().evicts().touch(entry, topVer);
- // Entry was not in memory or in swap, so we remove it from cache.
- if (v == null) {
- if (isNew && entry.markObsoleteIfEmpty(ver))
- colocated.removeEntry(entry);
+ // Entry was not in memory or in swap, so we remove it from cache.
+ if (v == null) {
+ if (isNew && entry.markObsoleteIfEmpty(ver))
+ colocated.removeEntry(entry);
+ }
}
- else {
- if (!skipVals && cctx.config().isStatisticsEnabled())
- cctx.cache().metrics0().onRead(true);
+ }
- if (!skipVals)
- setResult(v, ver);
- else
- setSkipValueResult(true, ver);
+ if (v != null) {
+ if (!skipVals && cctx.config().isStatisticsEnabled())
+ cctx.cache().metrics0().onRead(true);
- return true;
- }
+ if (!skipVals)
+ setResult(v, ver);
+ else
+ setSkipValueResult(true, ver);
+
+ return true;
}
boolean topStable = cctx.isReplicated() || topVer.equals(cctx.topology().topologyVersion());
http://git-wip-us.apache.org/repos/asf/ignite/blob/01671827/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index e477592..47572fd 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -61,6 +61,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheReturn;
import org.apache.ignite.internal.processors.cache.GridCacheUpdateAtomicResult;
import org.apache.ignite.internal.processors.cache.IgniteCacheExpiryPolicy;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
+import org.apache.ignite.internal.processors.cache.database.CacheDataRow;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheAdapter;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheEntry;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtFuture;
@@ -196,11 +197,9 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
@Override public GridCacheMapEntry create(
GridCacheContext ctx,
AffinityTopologyVersion topVer,
- KeyCacheObject key,
- int hash,
- CacheObject val
+ KeyCacheObject key
) {
- return new GridDhtAtomicCacheEntry(ctx, topVer, key, hash, val);
+ return new GridDhtAtomicCacheEntry(ctx, topVer, key);
}
};
}
@@ -1473,105 +1472,156 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
final IgniteCacheExpiryPolicy expiry = skipVals ? null : expiryPolicy(expiryPlc);
+ final boolean evt = !skipVals;
+
// Optimisation: try to resolve value locally and escape 'get future' creation.
if (!forcePrimary && ctx.affinityNode()) {
- Map<K, V> locVals = U.newHashMap(keys.size());
-
- boolean success = true;
-
- // Optimistically expect that all keys are available locally (avoid creation of get future).
- for (KeyCacheObject key : keys) {
- GridCacheEntryEx entry = null;
-
- while (true) {
- try {
- entry = entryEx(key);
-
- // If our DHT cache do has value, then we peek it.
- if (entry != null) {
- boolean isNew = entry.isNewLocked();
-
- EntryGetResult getRes = null;
- CacheObject v = null;
- GridCacheVersion ver = null;
-
- if (needVer) {
- getRes = entry.innerGetVersioned(
- null,
- null,
- /*update-metrics*/false,
- /*event*/!skipVals,
- subjId,
- null,
- taskName,
- expiry,
+ try {
+ Map<K, V> locVals = U.newHashMap(keys.size());
+
+ boolean success = true;
+ boolean readNoEntry = ctx.readNoEntry(expiry, false);
+
+ // Optimistically expect that all keys are available locally (avoid creation of get future).
+ for (KeyCacheObject key : keys) {
+ if (readNoEntry) {
+ CacheDataRow row = ctx.offheap().read(key);
+
+ if (row != null) {
+ long expireTime = row.expireTime();
+
+ if (expireTime == 0 || expireTime > U.currentTimeMillis()) {
+ ctx.addResult(locVals,
+ key,
+ row.value(),
+ skipVals,
+ false,
+ deserializeBinary,
true,
- null);
-
- if (getRes != null) {
- v = getRes.value();
- ver = getRes.version();
- }
- }
- else {
- v = entry.innerGet(
null,
- null,
- /*read-through*/false,
- /*update-metrics*/false,
- /*event*/!skipVals,
- subjId,
- null,
- taskName,
- expiry,
- !deserializeBinary);
- }
-
- // Entry was not in memory or in swap, so we remove it from cache.
- if (v == null) {
- GridCacheVersion obsoleteVer = context().versions().next();
-
- if (isNew && entry.markObsoleteIfEmpty(obsoleteVer))
- removeEntry(entry);
-
- success = false;
+ row.version(),
+ 0,
+ 0,
+ needVer);
+
+ if (evt) {
+ ctx.events().readEvent(key,
+ null,
+ row.value(),
+ subjId,
+ taskName,
+ !deserializeBinary);
+ }
}
else
- ctx.addResult(locVals, key, v, skipVals, false, deserializeBinary, true,
- getRes, ver, 0, 0, needVer);
+ success = false;
}
else
success = false;
-
- break; // While.
- }
- catch (GridCacheEntryRemovedException ignored) {
- // No-op, retry.
}
- catch (GridDhtInvalidPartitionException ignored) {
- success = false;
+ else {
+ GridCacheEntryEx entry = null;
+
+ while (true) {
+ try {
+ entry = entryEx(key);
+
+ // If our DHT cache do has value, then we peek it.
+ if (entry != null) {
+ boolean isNew = entry.isNewLocked();
+
+ EntryGetResult getRes = null;
+ CacheObject v = null;
+ GridCacheVersion ver = null;
+
+ if (needVer) {
+ getRes = entry.innerGetVersioned(
+ null,
+ null,
+ /*update-metrics*/false,
+ /*event*/evt,
+ subjId,
+ null,
+ taskName,
+ expiry,
+ true,
+ null);
+
+ if (getRes != null) {
+ v = getRes.value();
+ ver = getRes.version();
+ }
+ }
+ else {
+ v = entry.innerGet(
+ null,
+ null,
+ /*read-through*/false,
+ /*update-metrics*/false,
+ /*event*/evt,
+ subjId,
+ null,
+ taskName,
+ expiry,
+ !deserializeBinary);
+ }
+
+ // Entry was not in memory or in swap, so we remove it from cache.
+ if (v == null) {
+ if (isNew && entry.markObsoleteIfEmpty(context().versions().next()))
+ removeEntry(entry);
+
+ success = false;
+ }
+ else {
+ ctx.addResult(locVals,
+ key,
+ v,
+ skipVals,
+ false,
+ deserializeBinary,
+ true,
+ getRes,
+ ver,
+ 0,
+ 0,
+ needVer);
+ }
+ }
+ else
+ success = false;
- break; // While.
- }
- catch (IgniteCheckedException e) {
- return new GridFinishedFuture<>(e);
- }
- finally {
- if (entry != null)
- ctx.evicts().touch(entry, topVer);
+ break; // While.
+ }
+ catch (GridCacheEntryRemovedException ignored) {
+ // No-op, retry.
+ }
+ catch (GridDhtInvalidPartitionException ignored) {
+ success = false;
+
+ break; // While.
+ }
+ finally {
+ if (entry != null)
+ ctx.evicts().touch(entry, topVer);
+ }
+ }
}
- }
- if (!success)
- break;
- else if (!skipVals && ctx.config().isStatisticsEnabled())
- metrics0().onRead(true);
- }
+ if (!success)
+ break;
+ else if (!skipVals && ctx.config().isStatisticsEnabled())
+ metrics0().onRead(true);
+ }
- if (success) {
- sendTtlUpdateRequest(expiry);
+ if (success) {
+ sendTtlUpdateRequest(expiry);
- return new GridFinishedFuture<>(locVals);
+ return new GridFinishedFuture<>(locVals);
+ }
+ }
+ catch (IgniteCheckedException e) {
+ return new GridFinishedFuture<>(e);
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/01671827/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCacheEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCacheEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCacheEntry.java
index 3f014d5..b0c9a64 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCacheEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCacheEntry.java
@@ -18,7 +18,6 @@
package org.apache.ignite.internal.processors.cache.distributed.dht.atomic;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
-import org.apache.ignite.internal.processors.cache.CacheObject;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheEntry;
@@ -33,17 +32,13 @@ public class GridDhtAtomicCacheEntry extends GridDhtCacheEntry {
* @param ctx Cache context.
* @param topVer Topology version at the time of creation (if negative, then latest topology is assumed).
* @param key Cache key.
- * @param hash Key hash value.
- * @param val Entry value.
*/
- public GridDhtAtomicCacheEntry(
+ GridDhtAtomicCacheEntry(
GridCacheContext ctx,
AffinityTopologyVersion topVer,
- KeyCacheObject key,
- int hash,
- CacheObject val
+ KeyCacheObject key
) {
- super(ctx, topVer, key, hash, val);
+ super(ctx, topVer, key);
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/01671827/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
index 2292cb2..12a3912 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
@@ -44,6 +44,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheMvccCandidate;
import org.apache.ignite.internal.processors.cache.GridCacheReturn;
import org.apache.ignite.internal.processors.cache.IgniteCacheExpiryPolicy;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
+import org.apache.ignite.internal.processors.cache.database.CacheDataRow;
import org.apache.ignite.internal.processors.cache.distributed.GridDistributedCacheEntry;
import org.apache.ignite.internal.processors.cache.distributed.GridDistributedLockCancelledException;
import org.apache.ignite.internal.processors.cache.distributed.GridDistributedUnlockRequest;
@@ -120,11 +121,9 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
@Override public GridCacheMapEntry create(
GridCacheContext ctx,
AffinityTopologyVersion topVer,
- KeyCacheObject key,
- int hash,
- CacheObject val
+ KeyCacheObject key
) {
- return new GridDhtColocatedCacheEntry(ctx, topVer, key, hash, val);
+ return new GridDhtColocatedCacheEntry(ctx, topVer, key);
}
};
}
@@ -458,118 +457,161 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
expiryPlc = expiryPolicy(null);
// Optimisation: try to resolve value locally and escape 'get future' creation.
- if (!forcePrimary) {
- Map<K, V> locVals = null;
+ if (!forcePrimary && ctx.affinityNode()) {
+ try {
+ Map<K, V> locVals = null;
- boolean success = true;
+ boolean success = true;
+ boolean readNoEntry = ctx.readNoEntry(expiryPlc, false);
+ boolean evt = !skipVals;
- // Optimistically expect that all keys are available locally (avoid creation of get future).
- for (KeyCacheObject key : keys) {
- GridCacheEntryEx entry = null;
-
- while (true) {
- try {
- entry = entryEx(key);
-
- // If our DHT cache do has value, then we peek it.
- if (entry != null) {
- boolean isNew = entry.isNewLocked();
-
- EntryGetResult getRes = null;
- CacheObject v = null;
- GridCacheVersion ver = null;
-
- if (needVer) {
- getRes = entry.innerGetVersioned(
- null,
- null,
- /**update-metrics*/false,
- /*event*/!skipVals,
- subjId,
- null,
- taskName,
- expiryPlc,
- !deserializeBinary,
- null);
-
- if (getRes != null) {
- v = getRes.value();
- ver = getRes.version();
- }
- }
- else {
- v = entry.innerGet(
- null,
- null,
- /*read-through*/false,
- /**update-metrics*/false,
- /*event*/!skipVals,
- subjId,
- null,
- taskName,
- expiryPlc,
- !deserializeBinary);
- }
-
- // Entry was not in memory or in swap, so we remove it from cache.
- if (v == null) {
- GridCacheVersion obsoleteVer = context().versions().next();
+ for (KeyCacheObject key : keys) {
+ if (readNoEntry) {
+ CacheDataRow row = ctx.offheap().read(key);
- if (isNew && entry.markObsoleteIfEmpty(obsoleteVer))
- removeEntry(entry);
+ if (row != null) {
+ long expireTime = row.expireTime();
- success = false;
- }
- else {
+ if (expireTime == 0 || expireTime > U.currentTimeMillis()) {
if (locVals == null)
locVals = U.newHashMap(keys.size());
ctx.addResult(locVals,
key,
- v,
+ row.value(),
skipVals,
keepCacheObj,
deserializeBinary,
true,
- getRes,
- ver,
+ null,
+ row.version(),
0,
0,
needVer);
+
+ if (evt) {
+ ctx.events().readEvent(key,
+ null,
+ row.value(),
+ subjId,
+ taskName,
+ !deserializeBinary);
+ }
}
+ else
+ success = false;
}
else
success = false;
-
- break; // While.
- }
- catch (GridCacheEntryRemovedException ignored) {
- // No-op, retry.
}
- catch (GridDhtInvalidPartitionException ignored) {
- success = false;
+ else {
+ GridCacheEntryEx entry = null;
+
+ while (true) {
+ try {
+ entry = entryEx(key);
+
+ // If our DHT cache do has value, then we peek it.
+ if (entry != null) {
+ boolean isNew = entry.isNewLocked();
+
+ EntryGetResult getRes = null;
+ CacheObject v = null;
+ GridCacheVersion ver = null;
+
+ if (needVer) {
+ getRes = entry.innerGetVersioned(
+ null,
+ null,
+ /*update-metrics*/false,
+ /*event*/evt,
+ subjId,
+ null,
+ taskName,
+ expiryPlc,
+ !deserializeBinary,
+ null);
+
+ if (getRes != null) {
+ v = getRes.value();
+ ver = getRes.version();
+ }
+ }
+ else {
+ v = entry.innerGet(
+ null,
+ null,
+ /*read-through*/false,
+ /*update-metrics*/false,
+ /*event*/evt,
+ subjId,
+ null,
+ taskName,
+ expiryPlc,
+ !deserializeBinary);
+ }
+
+ // Entry was not in memory or in swap, so we remove it from cache.
+ if (v == null) {
+ GridCacheVersion obsoleteVer = context().versions().next();
+
+ if (isNew && entry.markObsoleteIfEmpty(obsoleteVer))
+ removeEntry(entry);
+
+ success = false;
+ }
+ else {
+ if (locVals == null)
+ locVals = U.newHashMap(keys.size());
+
+ ctx.addResult(locVals,
+ key,
+ v,
+ skipVals,
+ keepCacheObj,
+ deserializeBinary,
+ true,
+ getRes,
+ ver,
+ 0,
+ 0,
+ needVer);
+ }
+ }
+ else
+ success = false;
- break; // While.
- }
- catch (IgniteCheckedException e) {
- return new GridFinishedFuture<>(e);
- }
- finally {
- if (entry != null)
- context().evicts().touch(entry, topVer);
+ break; // While.
+ }
+ catch (GridCacheEntryRemovedException ignored) {
+ // No-op, retry.
+ }
+ catch (GridDhtInvalidPartitionException ignored) {
+ success = false;
+
+ break; // While.
+ }
+ finally {
+ if (entry != null)
+ context().evicts().touch(entry, topVer);
+ }
+ }
}
- }
- if (!success)
- break;
- else if (!skipVals && ctx.config().isStatisticsEnabled())
- ctx.cache().metrics0().onRead(true);
- }
+ if (!success)
+ break;
+ else if (!skipVals && ctx.config().isStatisticsEnabled())
+ ctx.cache().metrics0().onRead(true);
+ }
- if (success) {
- sendTtlUpdateRequest(expiryPlc);
+ if (success) {
+ sendTtlUpdateRequest(expiryPlc);
- return new GridFinishedFuture<>(locVals);
+ return new GridFinishedFuture<>(locVals);
+ }
+ }
+ catch (IgniteCheckedException e) {
+ return new GridFinishedFuture<>(e);
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/01671827/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCacheEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCacheEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCacheEntry.java
index cc71e11..f7cc5a7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCacheEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCacheEntry.java
@@ -18,7 +18,6 @@
package org.apache.ignite.internal.processors.cache.distributed.dht.colocated;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
-import org.apache.ignite.internal.processors.cache.CacheObject;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheEntry;
@@ -32,17 +31,13 @@ public class GridDhtColocatedCacheEntry extends GridDhtCacheEntry {
* @param ctx Cache context.
* @param topVer Topology version at the time of creation (if negative, then latest topology is assumed).
* @param key Cache key.
- * @param hash Key hash value.
- * @param val Entry value.
*/
- public GridDhtColocatedCacheEntry(
+ GridDhtColocatedCacheEntry(
GridCacheContext ctx,
AffinityTopologyVersion topVer,
- KeyCacheObject key,
- int hash,
- CacheObject val
+ KeyCacheObject key
) {
- super(ctx, topVer, key, hash, val);
+ super(ctx, topVer, key);
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/01671827/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtDetachedCacheEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtDetachedCacheEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtDetachedCacheEntry.java
index ac81b63..404265d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtDetachedCacheEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtDetachedCacheEntry.java
@@ -20,7 +20,6 @@ package org.apache.ignite.internal.processors.cache.distributed.dht.colocated;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.internal.processors.cache.CacheObject;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
-import org.apache.ignite.internal.processors.cache.GridCacheMapEntry;
import org.apache.ignite.internal.processors.cache.GridCacheOperation;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
import org.apache.ignite.internal.processors.cache.database.CacheDataRow;
@@ -36,14 +35,9 @@ public class GridDhtDetachedCacheEntry extends GridDistributedCacheEntry {
/**
* @param ctx Cache context.
* @param key Cache key.
- * @param hash Key hash value.
- * @param val Entry value.
- * @param next Next entry in the linked list.
- * @param hdrId Header ID.
*/
- public GridDhtDetachedCacheEntry(GridCacheContext ctx, KeyCacheObject key, int hash, CacheObject val,
- GridCacheMapEntry next, int hdrId) {
- super(ctx, key, hash, val);
+ public GridDhtDetachedCacheEntry(GridCacheContext ctx, KeyCacheObject key) {
+ super(ctx, key);
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/01671827/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java
index 59d986a..0b25f58 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java
@@ -95,11 +95,9 @@ public abstract class GridNearCacheAdapter<K, V> extends GridDistributedCacheAda
@Override public GridCacheMapEntry create(
GridCacheContext ctx,
AffinityTopologyVersion topVer,
- KeyCacheObject key,
- int hash,
- CacheObject val
+ KeyCacheObject key
) {
- return new GridNearCacheEntry(ctx, key, hash, val);
+ return new GridNearCacheEntry(ctx, key);
}
};
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/01671827/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java
index fa098df..b17d0b5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java
@@ -66,16 +66,12 @@ public class GridNearCacheEntry extends GridDistributedCacheEntry {
/**
* @param ctx Cache context.
* @param key Cache key.
- * @param hash Key hash value.
- * @param val Entry value.
*/
public GridNearCacheEntry(
GridCacheContext ctx,
- KeyCacheObject key,
- int hash,
- CacheObject val
+ KeyCacheObject key
) {
- super(ctx, key, hash, val);
+ super(ctx, key);
part = ctx.affinity().partition(key);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/01671827/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
index f795ddc..9ad084e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
@@ -3878,17 +3878,16 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements AutoClosea
/**
* @param cctx Cache context.
* @param key Key.
- * @param val Value.
* @param filter Filter.
* @return {@code True} if filter passed.
*/
private boolean isAll(GridCacheContext cctx,
KeyCacheObject key,
- CacheObject val,
+ final CacheObject val0,
CacheEntryPredicate[] filter) {
- GridCacheEntryEx e = new GridDhtDetachedCacheEntry(cctx, key, 0, val, null, 0) {
+ GridCacheEntryEx e = new GridDhtDetachedCacheEntry(cctx, key) {
@Nullable @Override public CacheObject peekVisibleValue() {
- return rawGet();
+ return val0;
}
};
http://git-wip-us.apache.org/repos/asf/ignite/blob/01671827/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalCache.java
index 94f618a..5e3dc3f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalCache.java
@@ -86,11 +86,9 @@ public class GridLocalCache<K, V> extends GridCacheAdapter<K, V> {
@Override public GridCacheMapEntry create(
GridCacheContext ctx,
AffinityTopologyVersion topVer,
- KeyCacheObject key,
- int hash,
- CacheObject val
+ KeyCacheObject key
) {
- return new GridLocalCacheEntry(ctx, key, hash, val);
+ return new GridLocalCacheEntry(ctx, key);
}
};
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/01671827/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalCacheEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalCacheEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalCacheEntry.java
index 3e93917..421b32a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalCacheEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalCacheEntry.java
@@ -38,16 +38,12 @@ public class GridLocalCacheEntry extends GridCacheMapEntry {
/**
* @param ctx Cache registry.
* @param key Cache key.
- * @param hash Key hash value.
- * @param val Entry value.
*/
GridLocalCacheEntry(
GridCacheContext ctx,
- KeyCacheObject key,
- int hash,
- CacheObject val
+ KeyCacheObject key
) {
- super(ctx, key, hash, val);
+ super(ctx, key);
}
/** {@inheritDoc} */
[7/8] ignite git commit: Merge remote-tracking branch
'remotes/origin/master' into ignite-5075-cacheStart
Posted by sb...@apache.org.
Merge remote-tracking branch 'remotes/origin/master' into ignite-5075-cacheStart
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/0f258940
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/0f258940
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/0f258940
Branch: refs/heads/ignite-5075-cacheStart
Commit: 0f2589407cdd5aaf771eafcd56a4ba82dbb897c7
Parents: 55cc4b5 0167182
Author: sboikov <sb...@gridgain.com>
Authored: Fri May 12 09:21:25 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Fri May 12 09:21:25 2017 +0300
----------------------------------------------------------------------
.../processors/cache/GridCacheAdapter.java | 129 ++++--
.../cache/GridCacheConcurrentMap.java | 2 -
.../cache/GridCacheConcurrentMapImpl.java | 5 +-
.../processors/cache/GridCacheContext.java | 12 +
.../processors/cache/GridCacheEventManager.java | 32 ++
.../processors/cache/GridCacheMapEntry.java | 14 +-
.../cache/GridCacheMapEntryFactory.java | 6 +-
.../processors/cache/GridNoStorageCacheMap.java | 8 +-
.../cache/IgniteCacheOffheapManager.java | 7 +
.../cache/IgniteCacheOffheapManagerImpl.java | 17 +
.../distributed/GridDistributedCacheEntry.java | 8 +-
.../dht/GridCachePartitionedConcurrentMap.java | 10 +-
.../distributed/dht/GridDhtCacheAdapter.java | 8 +-
.../distributed/dht/GridDhtCacheEntry.java | 8 +-
.../distributed/dht/GridDhtLocalPartition.java | 4 +-
.../dht/GridPartitionedGetFuture.java | 158 +++++---
.../dht/GridPartitionedSingleGetFuture.java | 141 ++++---
.../dht/atomic/GridDhtAtomicCache.java | 226 ++++++-----
.../dht/atomic/GridDhtAtomicCacheEntry.java | 11 +-
.../dht/colocated/GridDhtColocatedCache.java | 222 ++++++-----
.../colocated/GridDhtColocatedCacheEntry.java | 11 +-
.../colocated/GridDhtDetachedCacheEntry.java | 10 +-
.../distributed/near/GridNearCacheAdapter.java | 6 +-
.../distributed/near/GridNearCacheEntry.java | 8 +-
.../cache/distributed/near/GridNearTxLocal.java | 7 +-
.../processors/cache/local/GridLocalCache.java | 6 +-
.../cache/local/GridLocalCacheEntry.java | 8 +-
.../local/atomic/GridLocalAtomicCache.java | 188 +++++----
.../GridCacheAtomicSequenceImpl.java | 101 +----
.../ignite/spi/discovery/tcp/ServerImpl.java | 2 +
.../cache/IgniteCacheNoSyncForGetTest.java | 395 +++++++++++++++++++
.../IgniteCacheExpiryPolicyAbstractTest.java | 2 +-
.../loadtests/hashmap/GridHashMapLoadTest.java | 4 +-
.../testsuites/IgniteCacheTestSuite2.java | 3 +
.../query/h2/twostep/GridMergeIndexSorted.java | 3 +
.../query/IgniteSqlSplitterSelfTest.java | 68 ++++
.../IgniteStartStopTest.cs | 2 +
.../dotnet/Apache.Ignite.Core/Ignition.cs | 16 +-
.../cache/IgniteGetFromComputeBenchmark.java | 167 ++++++++
39 files changed, 1445 insertions(+), 590 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/0f258940/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/0f258940/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalCache.java
----------------------------------------------------------------------
[4/8] ignite git commit: ignite-4932 When possible for cache 'get'
read directly from offheap without entry creation.
Posted by sb...@apache.org.
http://git-wip-us.apache.org/repos/asf/ignite/blob/01671827/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java
index e1d4484..56041ee 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java
@@ -55,6 +55,7 @@ import org.apache.ignite.internal.processors.cache.GridCachePreloaderAdapter;
import org.apache.ignite.internal.processors.cache.GridCacheReturn;
import org.apache.ignite.internal.processors.cache.IgniteCacheExpiryPolicy;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
+import org.apache.ignite.internal.processors.cache.database.CacheDataRow;
import org.apache.ignite.internal.processors.cache.local.GridLocalCache;
import org.apache.ignite.internal.processors.cache.transactions.IgniteTxLocalEx;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
@@ -384,7 +385,7 @@ public class GridLocalAtomicCache<K, V> extends GridLocalCache<K, V> {
UUID subjId = ctx.subjectIdPerCall(null, opCtx);
- Map<K, V> vals = new HashMap<>(keys.size(), 1.0f);
+ Map<K, V> vals = U.newHashMap(keys.size());
if (keyCheck)
validateCacheKeys(keys);
@@ -392,97 +393,142 @@ public class GridLocalAtomicCache<K, V> extends GridLocalCache<K, V> {
final IgniteCacheExpiryPolicy expiry = expiryPolicy(opCtx != null ? opCtx.expiry() : null);
boolean success = true;
+ boolean readNoEntry = ctx.readNoEntry(expiry, false);
+ final boolean evt = !skipVals;
for (K key : keys) {
if (key == null)
throw new NullPointerException("Null key.");
- GridCacheEntryEx entry = null;
-
KeyCacheObject cacheKey = ctx.toCacheKeyObject(key);
- while (true) {
- try {
- entry = entryEx(cacheKey);
+ boolean skipEntry = readNoEntry;
- if (entry != null) {
- CacheObject v;
+ if (readNoEntry) {
+ CacheDataRow row = ctx.offheap().read(cacheKey);
- if (needVer) {
- EntryGetResult res = entry.innerGetVersioned(
- null,
- null,
- /**update-metrics*/false,
- /*event*/!skipVals,
- subjId,
- null,
- taskName,
- expiry,
- !deserializeBinary,
- null);
-
- if (res != null) {
- ctx.addResult(
- vals,
- cacheKey,
- res,
- skipVals,
- false,
- deserializeBinary,
- true,
- needVer);
- }
- else
- success = false;
- }
- else {
- v = entry.innerGet(
- null,
+ if (row != null) {
+ long expireTime = row.expireTime();
+
+ if (expireTime == 0 || expireTime > U.currentTimeMillis()) {
+ ctx.addResult(vals,
+ cacheKey,
+ row.value(),
+ skipVals,
+ false,
+ deserializeBinary,
+ true,
+ null,
+ row.version(),
+ 0,
+ 0,
+ needVer);
+
+ if (configuration().isStatisticsEnabled() && !skipVals)
+ metrics0().onRead(true);
+
+ if (evt) {
+ ctx.events().readEvent(cacheKey,
null,
- /*read-through*/false,
- /**update-metrics*/true,
- /**event*/!skipVals,
+ row.value(),
subjId,
- null,
taskName,
- expiry,
!deserializeBinary);
+ }
+ }
+ else
+ skipEntry = false;
+ }
+ else
+ success = false;
+ }
- if (v != null) {
- ctx.addResult(vals,
- cacheKey,
- v,
- skipVals,
- false,
- deserializeBinary,
- true,
+ if (!skipEntry) {
+ GridCacheEntryEx entry = null;
+
+ while (true) {
+ try {
+ entry = entryEx(cacheKey);
+
+ if (entry != null) {
+ CacheObject v;
+
+ if (needVer) {
+ EntryGetResult res = entry.innerGetVersioned(
null,
- 0,
- 0);
+ null,
+ /*update-metrics*/false,
+ /*event*/evt,
+ subjId,
+ null,
+ taskName,
+ expiry,
+ !deserializeBinary,
+ null);
+
+ if (res != null) {
+ ctx.addResult(
+ vals,
+ cacheKey,
+ res,
+ skipVals,
+ false,
+ deserializeBinary,
+ true,
+ needVer);
+ }
+ else
+ success = false;
+ }
+ else {
+ v = entry.innerGet(
+ null,
+ null,
+ /*read-through*/false,
+ /*update-metrics*/true,
+ /*event*/evt,
+ subjId,
+ null,
+ taskName,
+ expiry,
+ !deserializeBinary);
+
+ if (v != null) {
+ ctx.addResult(vals,
+ cacheKey,
+ v,
+ skipVals,
+ false,
+ deserializeBinary,
+ true,
+ null,
+ 0,
+ 0);
+ }
+ else
+ success = false;
}
- else
- success = false;
}
- }
- else {
- if (!storeEnabled && configuration().isStatisticsEnabled() && !skipVals)
- metrics0().onRead(false);
+ else {
+ if (!storeEnabled && configuration().isStatisticsEnabled() && !skipVals)
+ metrics0().onRead(false);
- success = false;
+ success = false;
+ }
+
+ break; // While.
+ }
+ catch (GridCacheEntryRemovedException ignored) {
+ // No-op, retry.
+ }
+ finally {
+ if (entry != null)
+ ctx.evicts().touch(entry, ctx.affinity().affinityTopologyVersion());
}
- break; // While.
- }
- catch (GridCacheEntryRemovedException ignored) {
- // No-op, retry.
+ if (!success && storeEnabled)
+ break;
}
- finally {
- if (entry != null)
- ctx.evicts().touch(entry, ctx.affinity().affinityTopologyVersion());
- }
-
- if (!success && storeEnabled)
- break;
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/01671827/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
index 663040d..5961b8d 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
@@ -4099,6 +4099,8 @@ class ServerImpl extends TcpDiscoveryImpl {
DiscoveryDataPacket dataPacket = msg.gridDiscoveryData();
+ assert dataPacket != null : msg;
+
if (dataPacket.hasJoiningNodeData())
spi.onExchange(dataPacket, U.resolveClassLoader(spi.ignite().configuration()));
http://git-wip-us.apache.org/repos/asf/ignite/blob/01671827/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheNoSyncForGetTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheNoSyncForGetTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheNoSyncForGetTest.java
new file mode 100644
index 0000000..9250e0b
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheNoSyncForGetTest.java
@@ -0,0 +1,395 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import javax.cache.expiry.Duration;
+import javax.cache.expiry.ModifiedExpiryPolicy;
+import javax.cache.processor.EntryProcessorException;
+import javax.cache.processor.MutableEntry;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheEntry;
+import org.apache.ignite.cache.CacheEntryProcessor;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteCallable;
+import org.apache.ignite.resources.IgniteInstanceResource;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+
+/**
+ *
+ */
+@SuppressWarnings("unchecked")
+public class IgniteCacheNoSyncForGetTest extends GridCommonAbstractTest {
+ /** IP finder. */
+ private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
+
+ /** */
+ private static volatile CountDownLatch processorStartLatch;
+
+ /** */
+ private static volatile CountDownLatch hangLatch;
+
+ /** */
+ private boolean client;
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+ ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(IP_FINDER);
+
+ cfg.setClientMode(client);
+
+ return cfg;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTestsStarted() throws Exception {
+ super.beforeTestsStarted();
+
+ startGrid(0);
+
+ client = true;
+
+ startGrid(1);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTestsStopped() throws Exception {
+ stopAllGrids();
+
+ super.afterTestsStopped();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testAtomicGet() throws Exception {
+ getTest(ATOMIC);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testTxGet() throws Exception {
+ getTest(TRANSACTIONAL);
+ }
+
+ /**
+ * @param atomicityMode Cache atomicity mode.
+ * @throws Exception If failed.
+ */
+ private void getTest(CacheAtomicityMode atomicityMode) throws Exception {
+ boolean getAll[] = {true, false};
+ boolean cfgExpiryPlc[] = {false};
+ boolean withExpiryPlc[] = {false};
+ boolean heapCache[] = {false};
+
+ for (boolean getAll0 : getAll) {
+ for (boolean expiryPlc0 : cfgExpiryPlc) {
+ for (boolean withExpiryPlc0 : withExpiryPlc) {
+ for (boolean heapCache0 : heapCache)
+ doGet(atomicityMode, heapCache0, getAll0, expiryPlc0, withExpiryPlc0);
+ }
+ }
+ }
+ }
+
+ /**
+ * @param atomicityMode Cache atomicity mode.
+ * @param heapCache Heap cache flag.
+ * @param getAll Test getAll flag.
+ * @param cfgExpiryPlc Configured expiry policy flag.
+ * @param withExpiryPlc Custom expiry policy flag.
+ * @throws Exception If failed.
+ */
+ private void doGet(CacheAtomicityMode atomicityMode,
+ boolean heapCache,
+ final boolean getAll,
+ final boolean cfgExpiryPlc,
+ final boolean withExpiryPlc) throws Exception {
+ log.info("Test get [getAll=" + getAll + ", cfgExpiryPlc=" + cfgExpiryPlc + ']');
+
+ Ignite srv = ignite(0);
+
+ Ignite client = ignite(1);
+
+ final IgniteCache cache = client.createCache(cacheConfiguration(atomicityMode, heapCache, cfgExpiryPlc));
+
+ final Map<Object, Object> data = new HashMap<>();
+
+ data.put(1, 1);
+ data.put(2, 2);
+
+ try {
+ // Get from compute closure.
+ {
+ cache.putAll(data);
+
+ hangLatch = new CountDownLatch(1);
+ processorStartLatch = new CountDownLatch(1);
+
+ IgniteInternalFuture fut = GridTestUtils.runAsync(new Callable<Void>() {
+ @Override public Void call() throws Exception {
+ if (getAll)
+ cache.invokeAll(data.keySet(), new HangEntryProcessor());
+ else
+ cache.invoke(1, new HangEntryProcessor());
+
+ return null;
+ }
+ });
+
+ try {
+ boolean wait = processorStartLatch.await(30, TimeUnit.SECONDS);
+
+ assertTrue(wait);
+
+ if (getAll) {
+ assertEquals(data, client.compute().affinityCall(cache.getName(), 1,
+ new GetAllClosure(data.keySet(), cache.getName(), withExpiryPlc)));
+ }
+ else {
+ assertEquals(1, client.compute().affinityCall(cache.getName(), 1,
+ new GetClosure(1, cache.getName(), withExpiryPlc)));
+ }
+
+ hangLatch.countDown();
+
+ fut.get();
+ }
+ finally {
+ hangLatch.countDown();
+ }
+ }
+
+ // Local get.
+ {
+ cache.putAll(data);
+
+ hangLatch = new CountDownLatch(1);
+ processorStartLatch = new CountDownLatch(1);
+
+ IgniteInternalFuture fut = GridTestUtils.runAsync(new Callable<Void>() {
+ @Override public Void call() throws Exception {
+ if (getAll)
+ cache.invokeAll(data.keySet(), new HangEntryProcessor());
+ else
+ cache.invoke(1, new HangEntryProcessor());
+
+ return null;
+ }
+ });
+
+ try {
+ boolean wait = processorStartLatch.await(30, TimeUnit.SECONDS);
+
+ assertTrue(wait);
+
+ IgniteCache srvCache = srv.cache(cache.getName());
+
+ if (withExpiryPlc)
+ srvCache = srvCache.withExpiryPolicy(ModifiedExpiryPolicy.factoryOf(Duration.FIVE_MINUTES).create());
+
+ if (getAll) {
+ assertEquals(data, srvCache.getAll(data.keySet()));
+ assertEquals(data.size(), srvCache.getEntries(data.keySet()).size());
+ }
+ else {
+ assertEquals(1, srvCache.get(1));
+ assertEquals(1, srvCache.getEntry(1).getValue());
+ }
+
+ hangLatch.countDown();
+
+ fut.get();
+ }
+ finally {
+ hangLatch.countDown();
+ }
+ }
+ }
+ finally {
+ client.destroyCache(cache.getName());
+ }
+ }
+
+ /**
+ * @param atomicityMode Atomicity mode.
+ * @param heapCache Heap cache flag.
+ * @param expiryPlc Expiry policy flag.
+ * @return Cache configuration.
+ */
+ private CacheConfiguration cacheConfiguration(CacheAtomicityMode atomicityMode,
+ boolean heapCache,
+ boolean expiryPlc) {
+ CacheConfiguration ccfg = new CacheConfiguration();
+
+ ccfg.setAtomicityMode(atomicityMode);
+ ccfg.setOnheapCacheEnabled(heapCache);
+ ccfg.setWriteSynchronizationMode(FULL_SYNC);
+ ccfg.setName("testCache");
+
+ if (expiryPlc)
+ ccfg.setExpiryPolicyFactory(ModifiedExpiryPolicy.factoryOf(Duration.FIVE_MINUTES));
+
+ return ccfg;
+ }
+
+ /**
+ *
+ */
+ static class HangEntryProcessor implements CacheEntryProcessor {
+ /** {@inheritDoc} */
+ @Override public Object process(MutableEntry entry, Object... arguments) {
+ assert processorStartLatch != null;
+ assert hangLatch != null;
+
+ try {
+ processorStartLatch.countDown();
+
+ if (!hangLatch.await(60, TimeUnit.SECONDS))
+ throw new RuntimeException("Failed to wait for latch");
+ }
+ catch (Exception e) {
+ System.out.println("Unexpected error: " + e);
+
+ throw new EntryProcessorException(e);
+ }
+
+ entry.setValue(U.currentTimeMillis());
+
+ return null;
+ }
+ }
+
+ /**
+ *
+ */
+ public static class GetClosure implements IgniteCallable<Object> {
+ /** */
+ @IgniteInstanceResource
+ private Ignite ignite;
+
+ /** */
+ private final int key;
+
+ /** */
+ private final String cacheName;
+
+ /** */
+ private final boolean withExpiryPlc;
+
+ /**
+ * @param key Key.
+ * @param cacheName Cache name.
+ * @param withExpiryPlc Custom expiry policy flag.
+ */
+ GetClosure(int key, String cacheName, boolean withExpiryPlc) {
+ this.key = key;
+ this.cacheName = cacheName;
+ this.withExpiryPlc = withExpiryPlc;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Object call() throws Exception {
+ IgniteCache cache = ignite.cache(cacheName);
+
+ if (withExpiryPlc)
+ cache = cache.withExpiryPolicy(ModifiedExpiryPolicy.factoryOf(Duration.FIVE_MINUTES).create());
+
+ Object val = cache.get(key);
+
+ CacheEntry e = cache.getEntry(key);
+
+ assertEquals(val, e.getValue());
+
+ return val;
+ }
+ }
+
+ /**
+ *
+ */
+ public static class GetAllClosure implements IgniteCallable<Object> {
+ /** */
+ @IgniteInstanceResource
+ private Ignite ignite;
+
+ /** */
+ private final Set<Object> keys;
+
+ /** */
+ private final String cacheName;
+
+ /** */
+ private final boolean withExpiryPlc;
+
+ /**
+ * @param keys Keys.
+ * @param cacheName Cache name.
+ * @param withExpiryPlc Custom expiry policy flag.
+ */
+ GetAllClosure(Set<Object> keys, String cacheName, boolean withExpiryPlc) {
+ this.keys = keys;
+ this.cacheName = cacheName;
+ this.withExpiryPlc = withExpiryPlc;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Object call() throws Exception {
+ IgniteCache cache = ignite.cache(cacheName);
+
+ if (withExpiryPlc)
+ cache = cache.withExpiryPolicy(ModifiedExpiryPolicy.factoryOf(Duration.FIVE_MINUTES).create());
+
+ Map vals = cache.getAll(keys);
+
+ Collection<CacheEntry> entries = cache.getEntries(keys);
+
+ assertEquals(vals.size(), entries.size());
+
+ for (CacheEntry entry : entries) {
+ Object val = vals.get(entry.getKey());
+
+ assertEquals(val, entry.getValue());
+ }
+
+ return vals;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/01671827/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyAbstractTest.java
index 3ff1bff..2b79367 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyAbstractTest.java
@@ -1009,7 +1009,7 @@ public abstract class IgniteCacheExpiryPolicyAbstractTest extends IgniteCacheAbs
if (cacheMode() != PARTITIONED)
return;
- factory = CreatedExpiryPolicy.factoryOf(new Duration(TimeUnit.SECONDS, 1));
+ factory = CreatedExpiryPolicy.factoryOf(new Duration(TimeUnit.SECONDS, 2));
nearCache = true;
http://git-wip-us.apache.org/repos/asf/ignite/blob/01671827/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridHashMapLoadTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridHashMapLoadTest.java b/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridHashMapLoadTest.java
index 5c12f84..7d4f90e 100644
--- a/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridHashMapLoadTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridHashMapLoadTest.java
@@ -79,10 +79,8 @@ public class GridHashMapLoadTest extends GridCommonAbstractTest {
while (true) {
Integer key = i++;
- Integer val = i++;
- map.put(key, new GridCacheMapEntry(ctx, ctx.toCacheKeyObject(key),
- key.hashCode(), ctx.toCacheObject(val)) {
+ map.put(key, new GridCacheMapEntry(ctx, ctx.toCacheKeyObject(key)) {
@Override public boolean tmLock(IgniteInternalTx tx,
long timeout,
@Nullable GridCacheVersion serOrder,
http://git-wip-us.apache.org/repos/asf/ignite/blob/01671827/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java
index 04a3753..943c5f5 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java
@@ -40,6 +40,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheVariableTopologySelf
import org.apache.ignite.internal.processors.cache.IgniteAtomicCacheEntryProcessorNodeJoinTest;
import org.apache.ignite.internal.processors.cache.IgniteCacheEntryProcessorNodeJoinTest;
import org.apache.ignite.internal.processors.cache.IgniteCacheIncrementTxTest;
+import org.apache.ignite.internal.processors.cache.IgniteCacheNoSyncForGetTest;
import org.apache.ignite.internal.processors.cache.IgniteCachePartitionMapUpdateTest;
import org.apache.ignite.internal.processors.cache.IgniteDynamicCacheAndNodeStop;
import org.apache.ignite.internal.processors.cache.IgniteOnePhaseCommitInvokeTest;
@@ -267,6 +268,8 @@ public class IgniteCacheTestSuite2 extends TestSuite {
suite.addTest(new TestSuite(IgniteOnePhaseCommitInvokeTest.class));
+ suite.addTest(new TestSuite(IgniteCacheNoSyncForGetTest.class));
+
return suite;
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/01671827/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgniteGetFromComputeBenchmark.java
----------------------------------------------------------------------
diff --git a/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgniteGetFromComputeBenchmark.java b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgniteGetFromComputeBenchmark.java
new file mode 100644
index 0000000..83fe665
--- /dev/null
+++ b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgniteGetFromComputeBenchmark.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.yardstick.cache;
+
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeSet;
+import javax.cache.processor.MutableEntry;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteCompute;
+import org.apache.ignite.IgniteDataStreamer;
+import org.apache.ignite.cache.CacheEntryProcessor;
+import org.apache.ignite.lang.IgniteCallable;
+import org.apache.ignite.lang.IgniteFuture;
+import org.apache.ignite.resources.IgniteInstanceResource;
+import org.apache.ignite.yardstick.cache.model.SampleValue;
+import org.yardstickframework.BenchmarkConfiguration;
+
+import static org.yardstickframework.BenchmarkUtils.println;
+
+/**
+ * Benchmark created to verify that slow EntryProcessor does not affect 'get' performance.
+ */
+public class IgniteGetFromComputeBenchmark extends IgniteCacheAbstractBenchmark<Integer, Object> {
+ /** */
+ private static final String CACHE_NAME = "atomic";
+
+ /** */
+ private IgniteCompute compute;
+
+ /** */
+ private IgniteCache asyncCache;
+
+ /** */
+ private ThreadLocal<IgniteFuture> invokeFut = new ThreadLocal<>();
+
+ /** {@inheritDoc} */
+ @Override public void setUp(BenchmarkConfiguration cfg) throws Exception {
+ super.setUp(cfg);
+
+ if (args.preloadAmount() > args.range())
+ throw new IllegalArgumentException("Preloading amount (\"-pa\", \"--preloadAmount\") " +
+ "must by less then the range (\"-r\", \"--range\").");
+
+ String cacheName = cache().getName();
+
+ println(cfg, "Loading data for cache: " + cacheName);
+
+ long start = System.nanoTime();
+
+ try (IgniteDataStreamer<Object, Object> dataLdr = ignite().dataStreamer(cacheName)) {
+ for (int i = 0; i < args.preloadAmount(); i++) {
+ dataLdr.addData(i, new SampleValue(i));
+
+ if (i % 100000 == 0) {
+ if (Thread.currentThread().isInterrupted())
+ break;
+
+ println("Loaded entries: " + i);
+ }
+ }
+ }
+
+ println(cfg, "Finished populating data [time=" + ((System.nanoTime() - start) / 1_000_000) + "ms, " +
+ "amount=" + args.preloadAmount() + ']');
+
+ compute = ignite().compute();
+
+ asyncCache = cache().withAsync();
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean test(Map<Object, Object> ctx) throws Exception {
+ IgniteFuture fut = invokeFut.get();
+
+ if (fut == null || fut.isDone()) {
+ Set<Integer> keys = new TreeSet<>();
+
+ for (int i = 0; i < 3; i++)
+ keys.add(nextRandom(args.range()));
+
+ asyncCache.invokeAll(keys, new SlowEntryProcessor(0));
+
+ invokeFut.set(asyncCache.future());
+ }
+
+ int key = nextRandom(args.range());
+
+ compute.affinityCall(CACHE_NAME, key, new GetClosure(key));
+
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected IgniteCache<Integer, Object> cache() {
+ return ignite().cache(CACHE_NAME);
+ }
+
+ /**
+ *
+ */
+ public static class GetClosure implements IgniteCallable<Object> {
+ /** */
+ @IgniteInstanceResource
+ private Ignite ignite;
+
+ /** */
+ private final int key;
+
+ /**
+ * @param key Key.
+ */
+ public GetClosure(int key) {
+ this.key = key;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Object call() throws Exception {
+ return ignite.cache(CACHE_NAME).get(key);
+ }
+ }
+
+ /**
+ *
+ */
+ public static class SlowEntryProcessor implements CacheEntryProcessor<Integer, Object, Object> {
+ /** */
+ private Object val;
+
+ /**
+ * @param val Value.
+ */
+ public SlowEntryProcessor(Object val) {
+ this.val = val;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Object process(MutableEntry<Integer, Object> entry, Object... args) {
+ try {
+ Thread.sleep(10);
+ }
+ catch (InterruptedException ignore) {
+ // No-op.
+ }
+
+ entry.setValue(val);
+
+ return null;
+ }
+ }
+}