You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2015/09/22 16:53:24 UTC
[01/11] ignite git commit: ignite-973 Fixed atomic cache 'remove' to
always provide old value for indexing
Repository: ignite
Updated Branches:
refs/heads/master 88acd318b -> 0a41ae572
ignite-973 Fixed atomic cache 'remove' to always provide old value for indexing
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/621eb0f7
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/621eb0f7
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/621eb0f7
Branch: refs/heads/master
Commit: 621eb0f75bbe1a0a623229dded38a3549309eead
Parents: 8b94494
Author: sboikov <se...@inria.fr>
Authored: Mon Sep 21 21:37:52 2015 +0300
Committer: sboikov <se...@inria.fr>
Committed: Mon Sep 21 21:37:52 2015 +0300
----------------------------------------------------------------------
.../processors/cache/GridCacheMapEntry.java | 37 +++++++++++++-------
.../processors/cache/GridCacheProcessor.java | 2 +-
.../processors/cache/GridCacheSwapManager.java | 24 ++++++-------
.../datastreamer/DataStreamerImpl.java | 2 --
4 files changed, 37 insertions(+), 28 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/621eb0f7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
index f2bb646..961c792 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
@@ -1588,6 +1588,9 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
boolean hasValPtr = hasOffHeapPointer();
+ if (old == null)
+ old = saveValueForIndexUnlocked();
+
// Update index inside synchronization since it can be updated
// in load methods without actually holding entry lock.
clearIndex(old);
@@ -2163,6 +2166,9 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
// Must persist inside synchronization in non-tx mode.
cctx.store().remove(null, keyValue(false));
+ if (oldVal == null)
+ oldVal = saveValueForIndexUnlocked();
+
// Update index inside synchronization since it can be updated
// in load methods without actually holding entry lock.
clearIndex(oldVal);
@@ -3342,7 +3348,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
try {
synchronized (this) {
- CacheObject expiredVal = saveValueForIndexUnlocked();
+ CacheObject expiredVal = saveOldValueUnlocked(false);
boolean hasOldBytes = hasOffHeapPointer();
@@ -3523,12 +3529,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
try {
GridCacheQueryManager qryMgr = cctx.queries();
- if (qryMgr != null && qryMgr.enabled()) {
- qryMgr.store(key,
- val,
- ver,
- expireTime);
- }
+ if (qryMgr.enabled())
+ qryMgr.store(key, val, ver, expireTime);
}
catch (IgniteCheckedException e) {
throw new GridCacheIndexUpdateException(e);
@@ -3547,8 +3549,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
try {
GridCacheQueryManager<?, ?> qryMgr = cctx.queries();
- if (qryMgr != null)
- qryMgr.remove(key(), prevVal == null ? null : prevVal);
+ if (qryMgr.enabled())
+ qryMgr.remove(key(), prevVal);
}
catch (IgniteCheckedException e) {
throw new GridCacheIndexUpdateException(e);
@@ -3562,10 +3564,19 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
* @return Previous value or {@code null}.
* @throws IgniteCheckedException If failed to retrieve previous value.
*/
- protected CacheObject saveValueForIndexUnlocked() throws IgniteCheckedException {
+ protected final CacheObject saveValueForIndexUnlocked() throws IgniteCheckedException {
+ return saveOldValueUnlocked(true);
+ }
+
+ /**
+ * @param qryOnly If {@code true} reads old value only if query indexing is enabled.
+ * @return Previous value or {@code null}.
+ * @throws IgniteCheckedException If failed to retrieve previous value.
+ */
+ private CacheObject saveOldValueUnlocked(boolean qryOnly) throws IgniteCheckedException {
assert Thread.holdsLock(this);
- if (cctx.queries() == null)
+ if (qryOnly && !cctx.queries().enabled())
return null;
CacheObject val = rawGetOrUnmarshalUnlocked(false);
@@ -3681,7 +3692,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
if (obsoleteVersionExtras() != null)
return true;
- CacheObject prev = saveValueForIndexUnlocked();
+ CacheObject prev = saveOldValueUnlocked(false);
if (!hasReaders() && markObsolete0(obsoleteVer, false)) {
if (swap) {
@@ -3791,7 +3802,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
GridCacheQueryManager qryMgr = cctx.queries();
- if (qryMgr != null)
+ if (qryMgr.enabled())
qryMgr.onUnswap(key, prevVal);
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/621eb0f7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index c92de7d..7c16136 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -2759,7 +2759,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
if (spaceName.equals(CU.swapSpaceName(cctx))) {
GridCacheQueryManager qryMgr = cctx.queries();
- if (qryMgr != null) {
+ if (qryMgr.enabled()) {
try {
KeyCacheObject key = cctx.toCacheKeyObject(keyBytes);
http://git-wip-us.apache.org/repos/asf/ignite/blob/621eb0f7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java
index 9b6381e..d9a8b5c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java
@@ -696,12 +696,12 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
final GridCacheQueryManager qryMgr = cctx.queries();
- if (qryMgr != null && !readSwapBeforeRemove(key, swapKey, ldr))
+ if (qryMgr.enabled() && !readSwapBeforeRemove(key, swapKey, ldr))
return null; // Not found.
swapMgr.remove(spaceName, swapKey, new CI1<byte[]>() {
@Override public void apply(byte[] rmv) {
- if (qryMgr == null && cctx.config().isStatisticsEnabled())
+ if (!qryMgr.enabled() && cctx.config().isStatisticsEnabled())
cctx.cache().metrics0().onSwapRead(rmv != null);
if (rmv != null) {
@@ -843,7 +843,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
GridCacheSwapEntry entry;
- if (qryMgr != null) {
+ if (qryMgr.enabled()) {
entry = readOffheapBeforeRemove(key, keyBytes, part);
if (entry != null) {
@@ -952,7 +952,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
ClassLoader ldr = cctx.deploy().globalLoader();
- if (qryMgr != null) { // Unswap for indexing.
+ if (qryMgr.enabled()) { // Unswap for indexing.
Iterator<SwapKey> iter = unprocessedKeys.iterator();
while (iter.hasNext()) {
@@ -967,7 +967,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
unprocessedKeys,
new IgniteBiInClosure<SwapKey, byte[]>() {
@Override public void apply(SwapKey swapKey, byte[] rmv) {
- if (qryMgr == null && cctx.config().isStatisticsEnabled())
+ if (!qryMgr.enabled() && cctx.config().isStatisticsEnabled())
cctx.cache().metrics0().onSwapRead(rmv != null);
if (rmv != null) {
@@ -1124,7 +1124,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
*/
public GridCacheSwapEntry readOffheapBeforeRemove(KeyCacheObject key, byte[] keyBytes, int part)
throws IgniteCheckedException {
- assert cctx.queries() != null;
+ assert cctx.queries().enabled();
byte[] entryBytes = offheap.get(spaceName, part, key, keyBytes);
@@ -1155,7 +1155,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
*/
private boolean readSwapBeforeRemove(@Nullable KeyCacheObject key, SwapKey swapKey, ClassLoader ldr)
throws IgniteCheckedException {
- assert cctx.queries() != null;
+ assert cctx.queries().enabled();
byte[] entryBytes = swapMgr.read(spaceName, swapKey, ldr);
@@ -1196,7 +1196,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
if (offheapEnabled) {
byte[] keyBytes = key.valueBytes(cctx.cacheObjectContext());
- if ((qryMgr == null || readOffheapBeforeRemove(key, keyBytes, part) != null) &&
+ if ((!qryMgr.enabled() || readOffheapBeforeRemove(key, keyBytes, part) != null) &&
offheap.removex(spaceName, part, key, keyBytes)) {
if (cctx.config().isStatisticsEnabled())
cctx.cache().metrics0().onOffHeapRemove();
@@ -1212,7 +1212,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
ClassLoader ldr = cctx.deploy().globalLoader();
- if (qryMgr != null && !readSwapBeforeRemove(key, swapKey, ldr))
+ if (qryMgr.enabled() && !readSwapBeforeRemove(key, swapKey, ldr))
return; // Not found.
swapMgr.remove(spaceName,
@@ -1279,7 +1279,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
GridCacheQueryManager qryMgr = cctx.queries();
- if (qryMgr != null)
+ if (qryMgr.enabled())
qryMgr.onSwap(key);
}
@@ -1308,7 +1308,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
cctx.events().addEvent(swapEntry.partition(), swapEntry.key(), cctx.nodeId(),
(IgniteUuid)null, null, EVT_CACHE_OBJECT_TO_OFFHEAP, null, false, null, true, null, null, null);
- if (qryMgr != null)
+ if (qryMgr.enabled())
qryMgr.onSwap(swapEntry.key());
}
}
@@ -1330,7 +1330,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
cctx.events().addEvent(batchSwapEntry.partition(), batchSwapEntry.key(), cctx.nodeId(),
(IgniteUuid)null, null, EVT_CACHE_OBJECT_SWAPPED, null, false, null, true, null, null, null);
- if (qryMgr != null)
+ if (qryMgr.enabled())
qryMgr.onSwap(batchSwapEntry.key());
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/621eb0f7/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
index b5d9a7d..ab2a6e8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
@@ -1569,8 +1569,6 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
GridCacheEntryEx entry = internalCache.entryEx(e.getKey(), topVer);
- entry.unswap(false);
-
if (plc != null) {
ttl = CU.toTtl(plc.getExpiryForCreation());
[11/11] ignite git commit: Merge branch 'ignite-1.4'
Posted by vo...@apache.org.
Merge branch 'ignite-1.4'
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/0a41ae57
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/0a41ae57
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/0a41ae57
Branch: refs/heads/master
Commit: 0a41ae57215e9d2d208d33f7a46653c4be43de9c
Parents: 88acd31 1942d75
Author: vozerov-gridgain <vo...@gridgain.com>
Authored: Tue Sep 22 17:53:41 2015 +0300
Committer: vozerov-gridgain <vo...@gridgain.com>
Committed: Tue Sep 22 17:53:41 2015 +0300
----------------------------------------------------------------------
.../apache/ignite/cache/CacheAtomicityMode.java | 17 +--
.../configuration/CacheConfiguration.java | 15 +++
.../processors/cache/GridCacheAdapter.java | 8 +-
.../processors/cache/GridCacheMapEntry.java | 51 ++++----
.../processors/cache/GridCacheProcessor.java | 10 +-
.../cache/GridCacheSwapEntryImpl.java | 31 ++++-
.../processors/cache/GridCacheSwapManager.java | 80 ++++++++-----
.../datastreamer/DataStreamerImpl.java | 2 -
.../IgniteCacheEntryListenerAbstractTest.java | 65 +++++++++-
.../IgniteCachePutRetryAbstractSelfTest.java | 33 ++++++
...lientDiscoverySpiFailureTimeoutSelfTest.java | 118 ++++++++++++++++++-
.../tcp/TcpClientDiscoverySpiSelfTest.java | 13 +-
.../processors/query/h2/IgniteH2Indexing.java | 19 +--
.../query/h2/opt/GridH2AbstractKeyValueRow.java | 54 ++++++---
.../query/h2/opt/GridH2KeyValueRowOffheap.java | 11 +-
.../query/h2/opt/GridH2RowDescriptor.java | 5 +
.../processors/query/h2/opt/GridH2Table.java | 10 +-
.../cache/CacheIndexStreamerTest.java | 37 ++++--
.../processors/cache/GridCacheSwapSelfTest.java | 4 +-
.../IgniteCacheWithIndexingTestSuite.java | 2 +
20 files changed, 448 insertions(+), 137 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/0a41ae57/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java
----------------------------------------------------------------------
[02/11] ignite git commit: IGNITE-1522 - Made cache entry listener
configurations transient in cache configuration
Posted by vo...@apache.org.
IGNITE-1522 - Made cache entry listener configurations transient in cache configuration
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/e51fb420
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/e51fb420
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/e51fb420
Branch: refs/heads/master
Commit: e51fb420d1284465c7cbe55a28c2374ddf67d495
Parents: 621eb0f
Author: Valentin Kulichenko <va...@gmail.com>
Authored: Mon Sep 21 23:29:20 2015 -0700
Committer: Valentin Kulichenko <va...@gmail.com>
Committed: Mon Sep 21 23:29:20 2015 -0700
----------------------------------------------------------------------
.../configuration/CacheConfiguration.java | 15 +++++
.../IgniteCacheEntryListenerAbstractTest.java | 65 +++++++++++++++++++-
2 files changed, 79 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/e51fb420/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java b/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java
index 7d1e14d..44a3fa9 100644
--- a/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java
+++ b/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java
@@ -19,6 +19,7 @@ package org.apache.ignite.configuration;
import java.io.Serializable;
import java.util.Collection;
+import java.util.HashSet;
import javax.cache.Cache;
import javax.cache.configuration.CompleteConfiguration;
import javax.cache.configuration.Factory;
@@ -1799,6 +1800,20 @@ public class CacheConfiguration<K, V> extends MutableConfiguration<K, V> {
return this;
}
+ /**
+ * Creates a copy of current configuration and removes all cache entry listeners.
+ * They are executed only locally and should never be sent to remote nodes.
+ *
+ * @return Configuration object that will be serialized.
+ */
+ protected Object writeReplace() {
+ CacheConfiguration<K, V> cfg = new CacheConfiguration<>(this);
+
+ cfg.listenerConfigurations = new HashSet<>();
+
+ return cfg;
+ }
+
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(CacheConfiguration.class, this);
http://git-wip-us.apache.org/repos/asf/ignite/blob/e51fb420/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAbstractTest.java
index 78a6700..3fdd7fc 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAbstractTest.java
@@ -17,6 +17,10 @@
package org.apache.ignite.internal.processors.cache;
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@@ -32,11 +36,13 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.cache.configuration.CacheEntryListenerConfiguration;
import javax.cache.configuration.Factory;
+import javax.cache.configuration.FactoryBuilder;
import javax.cache.configuration.MutableCacheEntryListenerConfiguration;
import javax.cache.event.CacheEntryCreatedListener;
import javax.cache.event.CacheEntryEvent;
import javax.cache.event.CacheEntryExpiredListener;
import javax.cache.event.CacheEntryListener;
+import javax.cache.event.CacheEntryListenerException;
import javax.cache.event.CacheEntryRemovedListener;
import javax.cache.event.CacheEntryUpdatedListener;
import javax.cache.event.EventType;
@@ -358,6 +364,34 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
}
/**
+ * @throws Exception If failed.
+ */
+ public void testSerialization() throws Exception {
+ if (cacheMode() == LOCAL)
+ return;
+
+ AtomicBoolean serialized = new AtomicBoolean();
+
+ NonSerializableListener lsnr = new NonSerializableListener(serialized);
+
+ jcache(0).registerCacheEntryListener(new MutableCacheEntryListenerConfiguration<>(
+ FactoryBuilder.factoryOf(lsnr),
+ null,
+ true,
+ false
+ ));
+
+ try {
+ startGrid(gridCount());
+ }
+ finally {
+ stopGrid(gridCount());
+ }
+
+ assertFalse(serialized.get());
+ }
+
+ /**
* @param key Key.
* @param val Value.
* @param cache Cache.
@@ -1190,4 +1224,33 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
}
}
-}
\ No newline at end of file
+ /**
+ */
+ public static class NonSerializableListener implements CacheEntryCreatedListener<Object, Object>, Externalizable {
+ /** */
+ private final AtomicBoolean serialized;
+
+ /**
+ * @param serialized Serialized flag.
+ */
+ public NonSerializableListener(AtomicBoolean serialized) {
+ this.serialized = serialized;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onCreated(Iterable<CacheEntryEvent<? extends Object, ? extends Object>> evts)
+ throws CacheEntryListenerException {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeExternal(ObjectOutput out) throws IOException {
+ serialized.set(true);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+ // No-op.
+ }
+ }
+}
[07/11] ignite git commit: Merge remote-tracking branch
'origin/ignite-1.4' into ignite-1.4
Posted by vo...@apache.org.
Merge remote-tracking branch 'origin/ignite-1.4' into ignite-1.4
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/39dace45
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/39dace45
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/39dace45
Branch: refs/heads/master
Commit: 39dace45c81aef7cb913fcf4f98a7d71e34beebd
Parents: f0be45e a104087
Author: S.Vladykin <sv...@gridgain.com>
Authored: Tue Sep 22 13:38:21 2015 +0300
Committer: S.Vladykin <sv...@gridgain.com>
Committed: Tue Sep 22 13:38:21 2015 +0300
----------------------------------------------------------------------
.../apache/ignite/cache/CacheAtomicityMode.java | 17 +--
.../processors/cache/GridCacheProcessor.java | 2 +-
...lientDiscoverySpiFailureTimeoutSelfTest.java | 118 ++++++++++++++++++-
.../tcp/TcpClientDiscoverySpiSelfTest.java | 13 +-
4 files changed, 125 insertions(+), 25 deletions(-)
----------------------------------------------------------------------
[05/11] ignite git commit: Merge remote-tracking branch
'origin/ignite-1.4' into ignite-1.4
Posted by vo...@apache.org.
Merge remote-tracking branch 'origin/ignite-1.4' into ignite-1.4
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/a1040872
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/a1040872
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/a1040872
Branch: refs/heads/master
Commit: a1040872f37cf4fd1dc20584c68307f420d0d3af
Parents: 33fe30d 50f75bd
Author: sboikov <sb...@gridgain.com>
Authored: Tue Sep 22 12:59:14 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Tue Sep 22 12:59:14 2015 +0300
----------------------------------------------------------------------
.../apache/ignite/cache/CacheAtomicityMode.java | 17 +++++------------
.../processors/cache/GridCacheProcessor.java | 2 +-
2 files changed, 6 insertions(+), 13 deletions(-)
----------------------------------------------------------------------
[10/11] ignite git commit: Added test.
Posted by vo...@apache.org.
Added test.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/1942d758
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/1942d758
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/1942d758
Branch: refs/heads/master
Commit: 1942d75856ab6d317b743de71b53a29abf81316a
Parents: ca2bce0
Author: sboikov <sb...@gridgain.com>
Authored: Tue Sep 22 17:36:18 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Tue Sep 22 17:36:18 2015 +0300
----------------------------------------------------------------------
.../IgniteCachePutRetryAbstractSelfTest.java | 33 ++++++++++++++++++++
1 file changed, 33 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/1942d758/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryAbstractSelfTest.java
index 943caeb..76f12c4 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryAbstractSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryAbstractSelfTest.java
@@ -168,6 +168,13 @@ public abstract class IgniteCachePutRetryAbstractSelfTest extends GridCommonAbst
/**
* @throws Exception If failed.
*/
+ public void testGetAndPut() throws Exception {
+ checkRetry(Test.GET_AND_PUT, TestMemoryMode.HEAP, false);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
public void testPutStoreEnabled() throws Exception {
checkRetry(Test.PUT, TestMemoryMode.HEAP, true);
}
@@ -275,6 +282,29 @@ public abstract class IgniteCachePutRetryAbstractSelfTest extends GridCommonAbst
break;
}
+ case GET_AND_PUT: {
+ for (int i = 0; i < keysCnt; i++)
+ cache.put(i, 0);
+
+ while (System.currentTimeMillis() < stopTime) {
+ Integer expOld = iter;
+
+ Integer val = ++iter;
+
+ for (int i = 0; i < keysCnt; i++) {
+ Integer old = cache.getAndPut(i, val);
+
+ assertTrue("Unexpected old value [old=" + old + ", exp=" + expOld + ']',
+ expOld.equals(old) || val.equals(old));
+ }
+
+ for (int i = 0; i < keysCnt; i++)
+ assertEquals(val, cache.get(i));
+ }
+
+ break;
+ }
+
case PUT_ALL: {
while (System.currentTimeMillis() < stopTime) {
Integer val = ++iter;
@@ -495,6 +525,9 @@ public abstract class IgniteCachePutRetryAbstractSelfTest extends GridCommonAbst
PUT,
/** */
+ GET_AND_PUT,
+
+ /** */
PUT_ALL,
/** */
[04/11] ignite git commit: Added test.
Posted by vo...@apache.org.
Added test.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/33fe30da
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/33fe30da
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/33fe30da
Branch: refs/heads/master
Commit: 33fe30da620e4f08cee959104805f3527b597700
Parents: e51fb42
Author: sboikov <sb...@gridgain.com>
Authored: Tue Sep 22 12:55:18 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Tue Sep 22 12:55:18 2015 +0300
----------------------------------------------------------------------
...lientDiscoverySpiFailureTimeoutSelfTest.java | 118 ++++++++++++++++++-
.../tcp/TcpClientDiscoverySpiSelfTest.java | 13 +-
2 files changed, 119 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/33fe30da/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiFailureTimeoutSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiFailureTimeoutSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiFailureTimeoutSelfTest.java
index 66275b3..14417c1 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiFailureTimeoutSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiFailureTimeoutSelfTest.java
@@ -21,12 +21,25 @@ import java.io.IOException;
import java.io.InputStream;
import java.net.Socket;
import java.net.SocketTimeoutException;
+import java.util.Collections;
+import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.events.DiscoveryEvent;
+import org.apache.ignite.events.Event;
import org.apache.ignite.internal.util.typedef.G;
import org.apache.ignite.internal.util.typedef.X;
+import org.apache.ignite.lang.IgnitePredicate;
+import org.apache.ignite.spi.discovery.tcp.internal.TcpDiscoveryNode;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryPingRequest;
import org.jetbrains.annotations.Nullable;
+import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
+
/**
* Client-based discovery SPI test with failure detection timeout enabled.
*/
@@ -60,7 +73,7 @@ public class TcpClientDiscoverySpiFailureTimeoutSelfTest extends TcpClientDiscov
/** {@inheritDoc} */
@Override protected TcpDiscoverySpi getDiscoverySpi() {
- return useTestSpi ? new TestTcpDiscoverySpi() : super.getDiscoverySpi();
+ return useTestSpi ? new TestTcpDiscoverySpi2() : super.getDiscoverySpi();
}
/**
@@ -117,16 +130,16 @@ public class TcpClientDiscoverySpiFailureTimeoutSelfTest extends TcpClientDiscov
private void checkFailureThresholdWorkability() throws Exception {
useTestSpi = true;
- TestTcpDiscoverySpi firstSpi = null;
- TestTcpDiscoverySpi secondSpi = null;
+ TestTcpDiscoverySpi2 firstSpi = null;
+ TestTcpDiscoverySpi2 secondSpi = null;
try {
startServerNodes(2);
checkNodes(2, 0);
- firstSpi = (TestTcpDiscoverySpi)(G.ignite("server-0").configuration().getDiscoverySpi());
- secondSpi = (TestTcpDiscoverySpi)(G.ignite("server-1").configuration().getDiscoverySpi());
+ firstSpi = (TestTcpDiscoverySpi2)(G.ignite("server-0").configuration().getDiscoverySpi());
+ secondSpi = (TestTcpDiscoverySpi2)(G.ignite("server-1").configuration().getDiscoverySpi());
assert firstSpi.err == null;
@@ -157,9 +170,102 @@ public class TcpClientDiscoverySpiFailureTimeoutSelfTest extends TcpClientDiscov
}
/**
+ * Test tries to provoke scenario when client sends reconnect message before router failure detected.
+ *
+ * @throws Exception If failed.
+ */
+ public void _testClientReconnectOnCoordinatorRouterFail() throws Exception {
+ startServerNodes(1);
+
+ Ignite srv = G.ignite("server-0");
+
+ final TcpDiscoveryNode srvNode = (TcpDiscoveryNode)srv.cluster().localNode();
+
+ final UUID srvNodeId = srvNode.id();
+
+ clientIpFinder = new TcpDiscoveryVmIpFinder();
+
+ clientIpFinder.setAddresses(
+ Collections.singleton("localhost:" + srvNode.discoveryPort() + ".." + (srvNode.discoveryPort() + 1)));
+
+ failureThreshold = 1000L;
+ netTimeout = 500L;
+
+ startClientNodes(1); // Client should connect to coordinator.
+
+ failureThreshold = 10_000L;
+ netTimeout = 5000L;
+
+ for (int i = 0; i < 2; i++) {
+ Ignite g = startGrid("server-" + srvIdx.getAndIncrement());
+
+ srvNodeIds.add(g.cluster().localNode().id());
+ }
+
+ checkNodes(3, 1);
+
+ final CountDownLatch latch = new CountDownLatch(3);
+
+ String nodes[] = {"server-1", "server-2", "client-0"};
+
+ final AtomicBoolean err = new AtomicBoolean();
+
+ for (String node : nodes) {
+ G.ignite(node).events().localListen(new IgnitePredicate<Event>() {
+ @Override public boolean apply(Event evt) {
+ DiscoveryEvent disoEvt = (DiscoveryEvent)evt;
+
+ if (disoEvt.eventNode().id().equals(srvNodeId)) {
+ info("Expected node failed event: " + ((DiscoveryEvent) evt).eventNode());
+
+ latch.countDown();
+ }
+ else {
+ log.info("Unexpected node failed event: " + evt);
+
+ err.set(true);
+ }
+
+ return true;
+ }
+ }, EVT_NODE_FAILED);
+ }
+
+ Thread.sleep(5000);
+
+ Ignite client = G.ignite("client-0");
+
+ UUID nodeId = client.cluster().localNode().id();
+
+ log.info("Fail coordinator: " + srvNodeId);
+
+ TestTcpDiscoverySpi srvSpi = (TestTcpDiscoverySpi)srv.configuration().getDiscoverySpi();
+
+ srvSpi.pauseAll(false);
+
+ try {
+ Thread.sleep(2000);
+ }
+ finally {
+ srvSpi.simulateNodeFailure();
+ srvSpi.resumeAll();
+ }
+
+ try {
+ assertTrue(latch.await(10_000, TimeUnit.MILLISECONDS));
+
+ assertFalse("Unexpected event, see log for details.", err.get());
+ assertEquals(nodeId, client.cluster().localNode().id());
+ }
+ finally {
+ srvSpi.resumeAll();
+ }
+ }
+
+ /**
*
*/
- private static class TestTcpDiscoverySpi extends TcpDiscoverySpi {
+ private static class TestTcpDiscoverySpi2 extends TcpDiscoverySpi {
/** */
private long readDelay;
http://git-wip-us.apache.org/repos/asf/ignite/blob/33fe30da/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java
index c86f06a..9fbf5b1 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java
@@ -89,13 +89,13 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
/** */
- private static final AtomicInteger srvIdx = new AtomicInteger();
+ protected static final AtomicInteger srvIdx = new AtomicInteger();
/** */
private static final AtomicInteger clientIdx = new AtomicInteger();
/** */
- private static Collection<UUID> srvNodeIds;
+ protected static Collection<UUID> srvNodeIds;
/** */
private static Collection<UUID> clientNodeIds;
@@ -128,13 +128,13 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
private UUID nodeId;
/** */
- private TcpDiscoveryVmIpFinder clientIpFinder;
+ protected TcpDiscoveryVmIpFinder clientIpFinder;
/** */
private long joinTimeout = TcpDiscoverySpi.DFLT_JOIN_TIMEOUT;
/** */
- private long netTimeout = TcpDiscoverySpi.DFLT_NETWORK_TIMEOUT;
+ protected long netTimeout = TcpDiscoverySpi.DFLT_NETWORK_TIMEOUT;
/** */
private boolean longSockTimeouts;
@@ -466,7 +466,8 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
@Override public void apply(Socket sock) {
try {
latch.await();
- } catch (InterruptedException e) {
+ }
+ catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
@@ -2056,7 +2057,7 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
/**
*
*/
- private static class TestTcpDiscoverySpi extends TcpDiscoverySpi {
+ protected static class TestTcpDiscoverySpi extends TcpDiscoverySpi {
/** */
private final Object mux = new Object();
[08/11] ignite git commit: ignite-1516 Optimize
GridH2AbstractKeyValueRow.getValue
Posted by vo...@apache.org.
ignite-1516 Optimize GridH2AbstractKeyValueRow.getValue
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/72c3eef2
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/72c3eef2
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/72c3eef2
Branch: refs/heads/master
Commit: 72c3eef2aa31df4a68b46a8877809cc0f49c1368
Parents: 39dace4
Author: sboikov <sb...@gridgain.com>
Authored: Tue Sep 22 13:51:09 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Tue Sep 22 13:51:09 2015 +0300
----------------------------------------------------------------------
.../processors/cache/GridCacheAdapter.java | 8 +--
.../processors/cache/GridCacheMapEntry.java | 14 ++---
.../processors/cache/GridCacheProcessor.java | 6 +--
.../cache/GridCacheSwapEntryImpl.java | 31 +++++++++--
.../processors/cache/GridCacheSwapManager.java | 56 +++++++++++++-------
.../processors/query/h2/IgniteH2Indexing.java | 19 ++++---
.../query/h2/opt/GridH2AbstractKeyValueRow.java | 49 ++++++++++-------
.../query/h2/opt/GridH2KeyValueRowOffheap.java | 11 +++-
.../query/h2/opt/GridH2RowDescriptor.java | 5 ++
.../cache/CacheIndexStreamerTest.java | 33 +++++++++---
.../processors/cache/GridCacheSwapSelfTest.java | 4 +-
.../IgniteCacheWithIndexingTestSuite.java | 2 +
12 files changed, 158 insertions(+), 80 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/72c3eef2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
index 1fc94ec..ae987b7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
@@ -805,9 +805,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
if (modes.offheap || modes.swap) {
GridCacheSwapManager swapMgr = ctx.isNear() ? ctx.near().dht().context().swap() : ctx.swap();
- GridCacheSwapEntry swapEntry = swapMgr.read(cacheKey, modes.offheap, modes.swap);
-
- cacheVal = swapEntry != null ? swapEntry.value() : null;
+ cacheVal = swapMgr.readValue(cacheKey, modes.offheap, modes.swap);
}
}
else
@@ -856,9 +854,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
if (offheap || swap) {
GridCacheSwapManager swapMgr = ctx.isNear() ? ctx.near().dht().context().swap() : ctx.swap();
- GridCacheSwapEntry swapEntry = swapMgr.read(key, offheap, swap);
-
- return swapEntry != null ? swapEntry.value() : null;
+ return swapMgr.readValue(key, offheap, swap);
}
return null;
http://git-wip-us.apache.org/repos/asf/ignite/blob/72c3eef2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
index 961c792..4bf0aa1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
@@ -512,7 +512,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
}
}
else
- e = detached() ? cctx.swap().read(this, true, true, true) : cctx.swap().readAndRemove(this);
+ e = detached() ? cctx.swap().read(this, true, true, true, false) : cctx.swap().readAndRemove(this);
if (log.isDebugEnabled())
log.debug("Read swap entry [swapEntry=" + e + ", cacheEntry=" + this + ']');
@@ -2840,7 +2840,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
}
if (offheap || swap) {
- GridCacheSwapEntry e = cctx.swap().read(this, false, offheap, swap);
+ GridCacheSwapEntry e = cctx.swap().read(this, false, offheap, swap, true);
return e != null ? e.value() : null;
}
@@ -3581,14 +3581,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
CacheObject val = rawGetOrUnmarshalUnlocked(false);
- if (val == null) {
- GridCacheSwapEntry swapEntry = cctx.swap().read(key, true, true);
-
- if (swapEntry == null)
- return null;
-
- return swapEntry.value();
- }
+ if (val == null)
+ val = cctx.swap().readValue(key, true, true);
return val;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/72c3eef2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index 9c325aa..e92ea57 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -2763,14 +2763,10 @@ public class GridCacheProcessor extends GridProcessorAdapter {
try {
KeyCacheObject key = cctx.toCacheKeyObject(keyBytes);
- GridCacheSwapEntry swapEntry = GridCacheSwapEntryImpl.unmarshal(valBytes);
+ GridCacheSwapEntry swapEntry = GridCacheSwapEntryImpl.unmarshal(valBytes, true);
CacheObject val = swapEntry.value();
- if (val == null)
- val = cctx.cacheObjects().toCacheObject(cctx.cacheObjectContext(), swapEntry.type(),
- swapEntry.valueBytes());
-
assert val != null;
qryMgr.remove(key, val);
http://git-wip-us.apache.org/repos/asf/ignite/blob/72c3eef2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapEntryImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapEntryImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapEntryImpl.java
index b7c66d3..6b1266f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapEntryImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapEntryImpl.java
@@ -94,8 +94,6 @@ public class GridCacheSwapEntryImpl implements GridCacheSwapEntry {
long expireTime,
@Nullable IgniteUuid keyClsLdrId,
@Nullable IgniteUuid valClsLdrId) {
- assert ver != null;
-
this.valBytes = valBytes;
this.type = type;
this.ver = ver;
@@ -268,9 +266,36 @@ public class GridCacheSwapEntryImpl implements GridCacheSwapEntry {
/**
* @param arr Entry bytes.
+ * @param valOnly If {@code true} unmarshalls only entry value.
* @return Entry.
*/
- public static GridCacheSwapEntryImpl unmarshal(byte[] arr) {
+ public static GridCacheSwapEntryImpl unmarshal(byte[] arr, boolean valOnly) {
+ if (valOnly) {
+ long off = BYTE_ARR_OFF + VERSION_OFFSET; // Skip ttl, expire time.
+
+ boolean verEx = UNSAFE.getByte(arr, off++) != 0;
+
+ off += verEx ? VERSION_EX_SIZE : VERSION_SIZE;
+
+ int arrLen = UNSAFE.getInt(arr, off);
+
+ off += 4;
+
+ byte type = UNSAFE.getByte(arr, off++);
+
+ byte[] valBytes = new byte[arrLen];
+
+ UNSAFE.copyMemory(arr, off, valBytes, BYTE_ARR_OFF, arrLen);
+
+ return new GridCacheSwapEntryImpl(ByteBuffer.wrap(valBytes),
+ type,
+ null,
+ 0L,
+ 0L,
+ null,
+ null);
+ }
+
long off = BYTE_ARR_OFF;
long ttl = UNSAFE.getLong(arr, off);
http://git-wip-us.apache.org/repos/asf/ignite/blob/72c3eef2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java
index d9a8b5c..2ab7b5b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java
@@ -569,6 +569,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
* @param entryLocked {@code True} if cache entry is locked.
* @param readOffheap Read offheap flag.
* @param readSwap Read swap flag.
+ * @param valOnly If {@code true} unmarshals only entry value.
* @return Value from swap or {@code null}.
* @throws IgniteCheckedException If failed.
*/
@@ -578,7 +579,8 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
int part,
boolean entryLocked,
boolean readOffheap,
- boolean readSwap)
+ boolean readSwap,
+ boolean valOnly)
throws IgniteCheckedException
{
assert readOffheap || readSwap;
@@ -605,7 +607,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
cctx.cache().metrics0().onOffHeapRead(bytes != null);
if (bytes != null)
- return swapEntry(unmarshalSwapEntry(bytes));
+ return swapEntry(unmarshalSwapEntry(bytes, valOnly));
}
if (!swapEnabled || !readSwap)
@@ -620,7 +622,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
if (bytes == null && lsnr != null)
return lsnr.entry;
- return bytes != null ? swapEntry(unmarshalSwapEntry(bytes)) : null;
+ return bytes != null ? swapEntry(unmarshalSwapEntry(bytes, valOnly)) : null;
}
finally {
if (lsnr != null)
@@ -706,7 +708,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
if (rmv != null) {
try {
- GridCacheSwapEntry entry = swapEntry(unmarshalSwapEntry(rmv));
+ GridCacheSwapEntry entry = swapEntry(unmarshalSwapEntry(rmv, false));
if (entry == null)
return;
@@ -756,20 +758,22 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
* @param locked {@code True} if cache entry is locked.
* @param readOffheap Read offheap flag.
* @param readSwap Read swap flag.
+ * @param valOnly If {@code true} unmarshals only entry value.
* @return Read value.
* @throws IgniteCheckedException If read failed.
*/
@Nullable GridCacheSwapEntry read(GridCacheEntryEx entry,
boolean locked,
boolean readOffheap,
- boolean readSwap)
+ boolean readSwap,
+ boolean valOnly)
throws IgniteCheckedException
{
if (!offheapEnabled && !swapEnabled)
return null;
return read(entry.key(), entry.key().valueBytes(cctx.cacheObjectContext()), entry.partition(), locked,
- readOffheap, readSwap);
+ readOffheap, readSwap, valOnly);
}
/**
@@ -805,7 +809,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
* @return Read value.
* @throws IgniteCheckedException If read failed.
*/
- @Nullable public GridCacheSwapEntry read(KeyCacheObject key,
+ @Nullable public CacheObject readValue(KeyCacheObject key,
boolean readOffheap,
boolean readSwap)
throws IgniteCheckedException
@@ -815,7 +819,17 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
int part = cctx.affinity().partition(key);
- return read(key, key.valueBytes(cctx.cacheObjectContext()), part, false, readOffheap, readSwap);
+ GridCacheSwapEntry swapEntry = read(key,
+ key.valueBytes(cctx.cacheObjectContext()),
+ part,
+ false,
+ readOffheap,
+ readSwap,
+ true);
+
+ assert swapEntry == null || swapEntry.value() != null : swapEntry;
+
+ return swapEntry != null ? swapEntry.value() : null;
}
/**
@@ -865,7 +879,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
cctx.cache().metrics0().onOffHeapRemove();
}
- entry = entryBytes == null ? null : swapEntry(unmarshalSwapEntry(entryBytes));
+ entry = entryBytes == null ? null : swapEntry(unmarshalSwapEntry(entryBytes, false));
}
return entry;
@@ -972,7 +986,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
if (rmv != null) {
try {
- GridCacheSwapEntry entry = swapEntry(unmarshalSwapEntry(rmv));
+ GridCacheSwapEntry entry = swapEntry(unmarshalSwapEntry(rmv, false));
if (entry == null)
return;
@@ -1078,7 +1092,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
Collection<GridCacheSwapListener> lsnrs = offheapLsnrs.get(part);
if (lsnrs != null) {
- GridCacheSwapEntry e = swapEntry(GridCacheSwapEntryImpl.unmarshal(entry));
+ GridCacheSwapEntry e = swapEntry(GridCacheSwapEntryImpl.unmarshal(entry, false));
for (GridCacheSwapListener lsnr : lsnrs)
lsnr.onEntryUnswapped(part, key, e);
@@ -1132,7 +1146,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
cctx.cache().metrics0().onOffHeapRead(entryBytes != null);
if (entryBytes != null) {
- GridCacheSwapEntry entry = swapEntry(unmarshalSwapEntry(entryBytes));
+ GridCacheSwapEntry entry = swapEntry(unmarshalSwapEntry(entryBytes, false));
if (entry != null) {
cctx.queries().onUnswap(key, entry.value());
@@ -1165,7 +1179,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
if (entryBytes == null)
return false;
- GridCacheSwapEntry entry = swapEntry(unmarshalSwapEntry(entryBytes));
+ GridCacheSwapEntry entry = swapEntry(unmarshalSwapEntry(entryBytes, true));
if (entry == null)
return false;
@@ -2063,7 +2077,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
try {
for (Map.Entry<byte[], byte[]> e : iter) {
try {
- GridCacheSwapEntry swapEntry = unmarshalSwapEntry(e.getValue());
+ GridCacheSwapEntry swapEntry = unmarshalSwapEntry(e.getValue(), false);
IgniteUuid valLdrId = swapEntry.valueClassLoaderId();
@@ -2120,10 +2134,11 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
/**
* @param bytes Bytes to unmarshal.
+ * @param valOnly If {@code true} unmarshalls only value.
* @return Unmarshalled entry.
*/
- private GridCacheSwapEntry unmarshalSwapEntry(byte[] bytes) {
- return GridCacheSwapEntryImpl.unmarshal(bytes);
+ private GridCacheSwapEntry unmarshalSwapEntry(byte[] bytes, boolean valOnly) {
+ return GridCacheSwapEntryImpl.unmarshal(bytes, valOnly);
}
/**
@@ -2169,7 +2184,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
@Override protected Map.Entry<byte[], GridCacheSwapEntry> onNext() throws IgniteCheckedException {
Map.Entry<byte[], byte[]> e = iter.nextX();
- GridCacheSwapEntry unmarshalled = unmarshalSwapEntry(e.getValue());
+ GridCacheSwapEntry unmarshalled = unmarshalSwapEntry(e.getValue(), false);
return F.t(e.getKey(), swapEntry(unmarshalled));
}
@@ -2446,6 +2461,9 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
abstract protected GridCloseableIterator<T1> partitionIterator(int part) throws IgniteCheckedException;
}
+ /**
+ *
+ */
private class GridVersionedMapEntry<K,V> implements Map.Entry<K,V>, GridCacheVersionAware {
/** */
private Map.Entry<byte[], byte[]> entry;
@@ -2474,7 +2492,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
/** {@inheritDoc} */
@Override public V getValue() {
try {
- GridCacheSwapEntry e = unmarshalSwapEntry(entry.getValue());
+ GridCacheSwapEntry e = unmarshalSwapEntry(entry.getValue(), false);
swapEntry(e);
@@ -2487,7 +2505,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
/** {@inheritDoc} */
@Override public GridCacheVersion version() {
- GridCacheSwapEntry e = unmarshalSwapEntry(entry.getValue());
+ GridCacheSwapEntry e = unmarshalSwapEntry(entry.getValue(), false);
return e.version();
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/72c3eef2/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
index 2af1386..8595187 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
@@ -71,7 +71,6 @@ import org.apache.ignite.internal.processors.cache.CacheObjectContext;
import org.apache.ignite.internal.processors.cache.GridCacheAdapter;
import org.apache.ignite.internal.processors.cache.GridCacheAffinityManager;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
-import org.apache.ignite.internal.processors.cache.GridCacheSwapEntry;
import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
import org.apache.ignite.internal.processors.cache.QueryCursorImpl;
import org.apache.ignite.internal.processors.cache.query.GridCacheTwoStepQuery;
@@ -2108,6 +2107,9 @@ public class IgniteH2Indexing implements GridQueryIndexing {
/** */
private final GridUnsafeGuard guard;
+ /** */
+ private final boolean preferSwapVal;
+
/**
* @param type Type descriptor.
* @param schema Schema.
@@ -2136,6 +2138,8 @@ public class IgniteH2Indexing implements GridQueryIndexing {
keyType = DataType.getTypeFromClass(type.keyClass());
valType = DataType.getTypeFromClass(type.valueClass());
+
+ preferSwapVal = schema.ccfg.getMemoryMode() == CacheMemoryMode.OFFHEAP_TIERED;
}
/** {@inheritDoc} */
@@ -2263,15 +2267,11 @@ public class IgniteH2Indexing implements GridQueryIndexing {
if (cctx.isNear())
cctx = cctx.near().dht().context();
- GridCacheSwapEntry e = cctx.swap().read(cctx.toCacheKeyObject(key), true, true);
+ CacheObject v = cctx.swap().readValue(cctx.toCacheKeyObject(key), true, true);
- if (e == null)
+ if (v == null)
return null;
- CacheObject v = e.value();
-
- assert v != null : "swap must unmarshall it for us";
-
return v.value(cctx.cacheObjectContext(), false);
}
@@ -2312,5 +2312,10 @@ public class IgniteH2Indexing implements GridQueryIndexing {
return new GridH2KeyValueRowOffheap(this, ptr);
}
+
+ /** {@inheritDoc} */
+ @Override public boolean preferSwapValue() {
+ return preferSwapVal;
+ }
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/72c3eef2/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2AbstractKeyValueRow.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2AbstractKeyValueRow.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2AbstractKeyValueRow.java
index 4a16284..c11f541 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2AbstractKeyValueRow.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2AbstractKeyValueRow.java
@@ -130,20 +130,23 @@ public abstract class GridH2AbstractKeyValueRow extends GridH2Row {
/**
* Atomically updates weak value.
*
- * @param upd New value.
- * @return {@code null} If update succeeded, unexpected value otherwise.
+ * @param valObj New value.
+ * @return New value if old value is empty, old value otherwise.
+ * @throws IgniteCheckedException If failed.
*/
- protected synchronized Value updateWeakValue(Value upd) {
+ protected synchronized Value updateWeakValue(Object valObj) throws IgniteCheckedException {
Value res = peekValue(VAL_COL);
if (res != null && !(res instanceof WeakValue))
return res;
+ Value upd = desc.wrap(valObj, desc.valueType());
+
setValue(VAL_COL, new WeakValue(upd));
notifyAll();
- return null;
+ return upd;
}
/**
@@ -188,21 +191,23 @@ public abstract class GridH2AbstractKeyValueRow extends GridH2Row {
Value v;
if (col == VAL_COL) {
- v = syncValue(0);
+ v = peekValue(VAL_COL);
long start = 0;
int attempt = 0;
while ((v = WeakValue.unwrap(v)) == null) {
- v = getOffheapValue(VAL_COL);
+ if (!desc.preferSwapValue()) {
+ v = getOffheapValue(VAL_COL);
- if (v != null) {
- setValue(VAL_COL, v);
+ if (v != null) {
+ setValue(VAL_COL, v);
- if (peekValue(KEY_COL) == null)
- cache();
+ if (peekValue(KEY_COL) == null)
+ cache();
- return v;
+ return v;
+ }
}
Object k = getValue(KEY_COL).getObject();
@@ -213,16 +218,24 @@ public abstract class GridH2AbstractKeyValueRow extends GridH2Row {
if (valObj != null) {
// Even if we've found valObj in swap, it is may be some new value,
// while the needed value was already unswapped, so we have to recheck it.
- if ((v = WeakValue.unwrap(syncValue(0))) == null && (v = getOffheapValue(VAL_COL)) == null) {
- Value upd = desc.wrap(valObj, desc.valueType());
-
- v = updateWeakValue(upd);
-
- return v == null ? upd : v;
- }
+ if ((v = getOffheapValue(VAL_COL)) == null)
+ return updateWeakValue(valObj);
}
else {
// If nothing found in swap then we should be already unswapped.
+ if (desc.preferSwapValue()) {
+ v = getOffheapValue(VAL_COL);
+
+ if (v != null) {
+ setValue(VAL_COL, v);
+
+ if (peekValue(KEY_COL) == null)
+ cache();
+
+ return v;
+ }
+ }
+
v = syncValue(attempt);
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/72c3eef2/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2KeyValueRowOffheap.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2KeyValueRowOffheap.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2KeyValueRowOffheap.java
index de31fe1..2dd9f25 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2KeyValueRowOffheap.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2KeyValueRowOffheap.java
@@ -216,12 +216,19 @@ public class GridH2KeyValueRowOffheap extends GridH2AbstractKeyValueRow {
/** {@inheritDoc} */
@SuppressWarnings("NonSynchronizedMethodOverridesSynchronizedMethod")
- @Override protected synchronized Value updateWeakValue(Value upd) {
+ @Override protected synchronized Value updateWeakValue(Object valObj) throws IgniteCheckedException {
+ Value val = peekValue(VAL_COL);
+
+ if (val != null)
+ return val;
+
+ Value upd = desc.wrap(valObj, desc.valueType());
+
setValue(VAL_COL, upd);
notifyAll();
- return null;
+ return upd;
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/72c3eef2/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2RowDescriptor.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2RowDescriptor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2RowDescriptor.java
index 0edd102..ed3ff7a 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2RowDescriptor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2RowDescriptor.java
@@ -111,4 +111,9 @@ public interface GridH2RowDescriptor extends GridOffHeapSmartPointerFactory<Grid
* @throws IgniteCheckedException If failed.
*/
public Value wrap(Object o, int type) throws IgniteCheckedException;
+
+ /**
+ * @return {@code True} if should check swap value before offheap.
+ */
+ public boolean preferSwapValue();
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/72c3eef2/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheIndexStreamerTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheIndexStreamerTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheIndexStreamerTest.java
index 23f4e91..e6bf22b 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheIndexStreamerTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheIndexStreamerTest.java
@@ -23,7 +23,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteDataStreamer;
-import org.apache.ignite.cache.CacheMemoryMode;
+import org.apache.ignite.cache.CacheAtomicityMode;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.IgniteInternalFuture;
@@ -36,6 +36,8 @@ import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.cache.CacheMemoryMode.OFFHEAP_TIERED;
import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
/**
@@ -45,7 +47,6 @@ public class CacheIndexStreamerTest extends GridCommonAbstractTest {
/** */
private final TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
-
/** {@inheritDoc} */
@Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
IgniteConfiguration cfg = super.getConfiguration(gridName);
@@ -60,14 +61,29 @@ public class CacheIndexStreamerTest extends GridCommonAbstractTest {
/**
* @throws Exception If failed.
*/
- public void testStreamer() throws Exception {
+ public void testStreamerAtomic() throws Exception {
+ checkStreamer(ATOMIC);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testStreamerTx() throws Exception {
+ checkStreamer(TRANSACTIONAL);
+ }
+
+ /**
+ * @param atomicityMode Cache atomicity mode.
+ * @throws Exception If failed.
+ */
+ public void checkStreamer(CacheAtomicityMode atomicityMode) throws Exception {
final Ignite ignite = startGrid(0);
- final IgniteCache<Integer, String> cache = ignite.createCache(cacheConfiguration());
+ final IgniteCache<Integer, String> cache = ignite.createCache(cacheConfiguration(atomicityMode));
final AtomicBoolean stop = new AtomicBoolean();
- final int KEYS= 10_000;
+ final int KEYS = 10_000;
try {
IgniteInternalFuture streamerFut = GridTestUtils.runAsync(new Callable() {
@@ -118,14 +134,15 @@ public class CacheIndexStreamerTest extends GridCommonAbstractTest {
}
/**
+ * @param atomicityMode Cache atomicity mode.
* @return Cache configuration.
*/
- private CacheConfiguration cacheConfiguration() {
+ private CacheConfiguration cacheConfiguration(CacheAtomicityMode atomicityMode) {
CacheConfiguration ccfg = new CacheConfiguration();
- ccfg.setAtomicityMode(ATOMIC);
+ ccfg.setAtomicityMode(atomicityMode);
ccfg.setWriteSynchronizationMode(FULL_SYNC);
- ccfg.setMemoryMode(CacheMemoryMode.OFFHEAP_TIERED);
+ ccfg.setMemoryMode(OFFHEAP_TIERED);
ccfg.setOffHeapMaxMemory(0);
ccfg.setBackups(1);
ccfg.setIndexedTypes(Integer.class, String.class);
http://git-wip-us.apache.org/repos/asf/ignite/blob/72c3eef2/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheSwapSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheSwapSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheSwapSelfTest.java
index e0e6ff0..cd1fc93 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheSwapSelfTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheSwapSelfTest.java
@@ -244,12 +244,12 @@ public class GridCacheSwapSelfTest extends GridCommonAbstractTest {
}
/**
- * TODO: IGNITE-599.
- *
* @throws Exception If failed.
*/
public void testSwapEviction() throws Exception {
try {
+ fail("https://issues.apache.org/jira/browse/IGNITE-599");
+
final CountDownLatch evicted = new CountDownLatch(10);
startGrids(1);
http://git-wip-us.apache.org/repos/asf/ignite/blob/72c3eef2/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheWithIndexingTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheWithIndexingTestSuite.java b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheWithIndexingTestSuite.java
index f30f70e..550c69f 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheWithIndexingTestSuite.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheWithIndexingTestSuite.java
@@ -19,6 +19,7 @@ package org.apache.ignite.testsuites;
import junit.framework.TestSuite;
import org.apache.ignite.internal.processors.cache.CacheConfigurationP2PTest;
+import org.apache.ignite.internal.processors.cache.CacheIndexStreamerTest;
import org.apache.ignite.internal.processors.cache.GridCacheOffHeapAndSwapSelfTest;
import org.apache.ignite.internal.processors.cache.GridCacheOffHeapSelfTest;
import org.apache.ignite.internal.processors.cache.GridCacheOffheapIndexEntryEvictTest;
@@ -63,6 +64,7 @@ public class IgniteCacheWithIndexingTestSuite extends TestSuite {
suite.addTestSuite(GridCacheOffheapIndexGetSelfTest.class);
suite.addTestSuite(GridCacheOffheapIndexEntryEvictTest.class);
+ suite.addTestSuite(CacheIndexStreamerTest.class);
suite.addTestSuite(CacheConfigurationP2PTest.class);
[09/11] ignite git commit: Check for WeakValue in
GridH2AbstractKeyValueRow.onUnswap
Posted by vo...@apache.org.
Check for WeakValue in GridH2AbstractKeyValueRow.onUnswap
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/ca2bce00
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/ca2bce00
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/ca2bce00
Branch: refs/heads/master
Commit: ca2bce00516142a1204fb9226c938174047e72d6
Parents: 72c3eef
Author: sboikov <sb...@gridgain.com>
Authored: Tue Sep 22 15:04:27 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Tue Sep 22 15:04:27 2015 +0300
----------------------------------------------------------------------
.../processors/query/h2/opt/GridH2AbstractKeyValueRow.java | 4 +++-
1 file changed, 3 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/ca2bce00/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2AbstractKeyValueRow.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2AbstractKeyValueRow.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2AbstractKeyValueRow.java
index c11f541..ca5442a 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2AbstractKeyValueRow.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2AbstractKeyValueRow.java
@@ -119,7 +119,9 @@ public abstract class GridH2AbstractKeyValueRow extends GridH2Row {
* @throws IgniteCheckedException If failed.
*/
public synchronized void onUnswap(Object val, boolean beforeRmv) throws IgniteCheckedException {
- if (peekValue(VAL_COL) != null)
+ Value val0 = peekValue(VAL_COL);
+
+ if (val0 != null && !(val0 instanceof WeakValue))
return;
setValue(VAL_COL, desc.wrap(val, desc.valueType()));
[06/11] ignite git commit: ignite-973 - fix
Posted by vo...@apache.org.
ignite-973 - fix
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/f0be45e3
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/f0be45e3
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/f0be45e3
Branch: refs/heads/master
Commit: f0be45e309f9a594334209a251c069f9ba3db120
Parents: e51fb42
Author: S.Vladykin <sv...@gridgain.com>
Authored: Tue Sep 22 13:36:40 2015 +0300
Committer: S.Vladykin <sv...@gridgain.com>
Committed: Tue Sep 22 13:36:40 2015 +0300
----------------------------------------------------------------------
.../query/h2/opt/GridH2AbstractKeyValueRow.java | 3 +++
.../internal/processors/query/h2/opt/GridH2Table.java | 10 +++++++++-
.../internal/processors/cache/CacheIndexStreamerTest.java | 4 ++--
3 files changed, 14 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/f0be45e3/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2AbstractKeyValueRow.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2AbstractKeyValueRow.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2AbstractKeyValueRow.java
index 07c49a5..4a16284 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2AbstractKeyValueRow.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2AbstractKeyValueRow.java
@@ -119,6 +119,9 @@ public abstract class GridH2AbstractKeyValueRow extends GridH2Row {
* @throws IgniteCheckedException If failed.
*/
public synchronized void onUnswap(Object val, boolean beforeRmv) throws IgniteCheckedException {
+ if (peekValue(VAL_COL) != null)
+ return;
+
setValue(VAL_COL, desc.wrap(val, desc.valueType()));
notifyAll();
http://git-wip-us.apache.org/repos/asf/ignite/blob/f0be45e3/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Table.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Table.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Table.java
index 66241b4..bf318b2 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Table.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Table.java
@@ -55,6 +55,8 @@ import org.h2.value.Value;
import org.jetbrains.annotations.Nullable;
import org.jsr166.ConcurrentHashMap8;
+import static org.apache.ignite.internal.processors.query.h2.opt.GridH2AbstractKeyValueRow.VAL_COL;
+
/**
* H2 Table implementation.
*/
@@ -372,6 +374,12 @@ public class GridH2Table extends TableBase {
if (!del) {
GridH2Row old = pk.put(row); // Put to PK.
+ if (old instanceof GridH2AbstractKeyValueRow) { // Unswap value on replace.
+ GridH2AbstractKeyValueRow kvOld = (GridH2AbstractKeyValueRow)old;
+
+ kvOld.onUnswap(kvOld.getValue(VAL_COL), true);
+ }
+
int len = idxs.size();
int i = 1;
@@ -399,7 +407,7 @@ public class GridH2Table extends TableBase {
GridH2Row old = pk.remove(row);
if (old instanceof GridH2AbstractKeyValueRow) { // Unswap value.
- Value v = row.getValue(GridH2AbstractKeyValueRow.VAL_COL);
+ Value v = row.getValue(VAL_COL);
if (v != null)
((GridH2AbstractKeyValueRow)old).onUnswap(v.getObject(), true);
http://git-wip-us.apache.org/repos/asf/ignite/blob/f0be45e3/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheIndexStreamerTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheIndexStreamerTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheIndexStreamerTest.java
index 25c3b81..23f4e91 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheIndexStreamerTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheIndexStreamerTest.java
@@ -31,7 +31,7 @@ import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
-import org.apache.ignite.spi.swapspace.file.FileSwapSpaceSpi;
+import org.apache.ignite.spi.swapspace.inmemory.GridTestSwapSpaceSpi;
import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
@@ -52,7 +52,7 @@ public class CacheIndexStreamerTest extends GridCommonAbstractTest {
((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(ipFinder);
- cfg.setSwapSpaceSpi(new FileSwapSpaceSpi());
+ cfg.setSwapSpaceSpi(new GridTestSwapSpaceSpi());
return cfg;
}
[03/11] ignite git commit: Cleaned documentation. Set ATOMIC mode as
default using specific constant.
Posted by vo...@apache.org.
Cleaned documentation. Set ATOMIC mode as default using specific constant.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/50f75bd6
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/50f75bd6
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/50f75bd6
Branch: refs/heads/master
Commit: 50f75bd6111b5b9163391e4c0913ff5b696a2862
Parents: e51fb42
Author: Denis Magda <dm...@gridgain.com>
Authored: Tue Sep 22 11:44:22 2015 +0300
Committer: Denis Magda <dm...@gridgain.com>
Committed: Tue Sep 22 11:44:22 2015 +0300
----------------------------------------------------------------------
.../apache/ignite/cache/CacheAtomicityMode.java | 17 +++++------------
.../processors/cache/GridCacheProcessor.java | 2 +-
2 files changed, 6 insertions(+), 13 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/50f75bd6/modules/core/src/main/java/org/apache/ignite/cache/CacheAtomicityMode.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/cache/CacheAtomicityMode.java b/modules/core/src/main/java/org/apache/ignite/cache/CacheAtomicityMode.java
index 9e0f81e..92b5aa1 100644
--- a/modules/core/src/main/java/org/apache/ignite/cache/CacheAtomicityMode.java
+++ b/modules/core/src/main/java/org/apache/ignite/cache/CacheAtomicityMode.java
@@ -33,11 +33,6 @@ public enum CacheAtomicityMode {
/**
* Specified fully {@code ACID}-compliant transactional cache behavior. See
* {@link Transaction} for more information about transactions.
- * <p>
- * This mode is currently the default cache atomicity mode. However, cache
- * atomicity mode will be changed to {@link #ATOMIC} starting from version {@code 5.2},
- * so it is recommended that desired atomicity mode is explicitly configured
- * instead of relying on default value.
*/
TRANSACTIONAL,
@@ -49,18 +44,16 @@ public enum CacheAtomicityMode {
* In addition to transactions and locking, one of the main differences in {@code ATOMIC} mode
* is that bulk writes, such as {@code putAll(...)}, {@code removeAll(...)}, and {@code transformAll(...)}
* methods, become simple batch operations which can partially fail. In case of partial
- * failure {@link org.apache.ignite.internal.processors.cache.CachePartialUpdateCheckedException} will be thrown which will contain a list of keys
- * for which the update failed. It is recommended that bulk writes are used whenever multiple keys
- * need to be inserted or updated in cache, as they reduce number of network trips and provide
- * better performance.
+ * failure {@link org.apache.ignite.internal.processors.cache.CachePartialUpdateCheckedException} will be thrown
+ * which will contain a list of keys for which the update failed. It is recommended that bulk writes are used
+ * whenever multiple keys need to be inserted or updated in cache, as they reduce number of network trips and
+ * provide better performance.
* <p>
* Note that even without locking and transactions, {@code ATOMIC} mode still provides
* full consistency guarantees across all cache nodes.
* <p>
* Also note that all data modifications in {@code ATOMIC} mode are guaranteed to be atomic
* and consistent with writes to the underlying persistent store, if one is configured.
- * <p>
- * This mode is currently implemented for {@link CacheMode#PARTITIONED} caches only.
*/
ATOMIC;
@@ -76,4 +69,4 @@ public enum CacheAtomicityMode {
@Nullable public static CacheAtomicityMode fromOrdinal(int ord) {
return ord >= 0 && ord < VALS.length ? VALS[ord] : null;
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/50f75bd6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index 7c16136..9c325aa 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -271,7 +271,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
cfg.setRebalanceMode(ASYNC);
if (cfg.getAtomicityMode() == null)
- cfg.setAtomicityMode(ATOMIC);
+ cfg.setAtomicityMode(CacheConfiguration.DFLT_CACHE_ATOMICITY_MODE);
if (cfg.getWriteSynchronizationMode() == null)
cfg.setWriteSynchronizationMode(PRIMARY_SYNC);