You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2015/09/22 16:53:31 UTC
[08/11] ignite git commit: ignite-1516 Optimize
GridH2AbstractKeyValueRow.getValue
ignite-1516 Optimize GridH2AbstractKeyValueRow.getValue
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/72c3eef2
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/72c3eef2
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/72c3eef2
Branch: refs/heads/master
Commit: 72c3eef2aa31df4a68b46a8877809cc0f49c1368
Parents: 39dace4
Author: sboikov <sb...@gridgain.com>
Authored: Tue Sep 22 13:51:09 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Tue Sep 22 13:51:09 2015 +0300
----------------------------------------------------------------------
.../processors/cache/GridCacheAdapter.java | 8 +--
.../processors/cache/GridCacheMapEntry.java | 14 ++---
.../processors/cache/GridCacheProcessor.java | 6 +--
.../cache/GridCacheSwapEntryImpl.java | 31 +++++++++--
.../processors/cache/GridCacheSwapManager.java | 56 +++++++++++++-------
.../processors/query/h2/IgniteH2Indexing.java | 19 ++++---
.../query/h2/opt/GridH2AbstractKeyValueRow.java | 49 ++++++++++-------
.../query/h2/opt/GridH2KeyValueRowOffheap.java | 11 +++-
.../query/h2/opt/GridH2RowDescriptor.java | 5 ++
.../cache/CacheIndexStreamerTest.java | 33 +++++++++---
.../processors/cache/GridCacheSwapSelfTest.java | 4 +-
.../IgniteCacheWithIndexingTestSuite.java | 2 +
12 files changed, 158 insertions(+), 80 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/72c3eef2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
index 1fc94ec..ae987b7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
@@ -805,9 +805,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
if (modes.offheap || modes.swap) {
GridCacheSwapManager swapMgr = ctx.isNear() ? ctx.near().dht().context().swap() : ctx.swap();
- GridCacheSwapEntry swapEntry = swapMgr.read(cacheKey, modes.offheap, modes.swap);
-
- cacheVal = swapEntry != null ? swapEntry.value() : null;
+ cacheVal = swapMgr.readValue(cacheKey, modes.offheap, modes.swap);
}
}
else
@@ -856,9 +854,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
if (offheap || swap) {
GridCacheSwapManager swapMgr = ctx.isNear() ? ctx.near().dht().context().swap() : ctx.swap();
- GridCacheSwapEntry swapEntry = swapMgr.read(key, offheap, swap);
-
- return swapEntry != null ? swapEntry.value() : null;
+ return swapMgr.readValue(key, offheap, swap);
}
return null;
http://git-wip-us.apache.org/repos/asf/ignite/blob/72c3eef2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
index 961c792..4bf0aa1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
@@ -512,7 +512,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
}
}
else
- e = detached() ? cctx.swap().read(this, true, true, true) : cctx.swap().readAndRemove(this);
+ e = detached() ? cctx.swap().read(this, true, true, true, false) : cctx.swap().readAndRemove(this);
if (log.isDebugEnabled())
log.debug("Read swap entry [swapEntry=" + e + ", cacheEntry=" + this + ']');
@@ -2840,7 +2840,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
}
if (offheap || swap) {
- GridCacheSwapEntry e = cctx.swap().read(this, false, offheap, swap);
+ GridCacheSwapEntry e = cctx.swap().read(this, false, offheap, swap, true);
return e != null ? e.value() : null;
}
@@ -3581,14 +3581,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
CacheObject val = rawGetOrUnmarshalUnlocked(false);
- if (val == null) {
- GridCacheSwapEntry swapEntry = cctx.swap().read(key, true, true);
-
- if (swapEntry == null)
- return null;
-
- return swapEntry.value();
- }
+ if (val == null)
+ val = cctx.swap().readValue(key, true, true);
return val;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/72c3eef2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index 9c325aa..e92ea57 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -2763,14 +2763,10 @@ public class GridCacheProcessor extends GridProcessorAdapter {
try {
KeyCacheObject key = cctx.toCacheKeyObject(keyBytes);
- GridCacheSwapEntry swapEntry = GridCacheSwapEntryImpl.unmarshal(valBytes);
+ GridCacheSwapEntry swapEntry = GridCacheSwapEntryImpl.unmarshal(valBytes, true);
CacheObject val = swapEntry.value();
- if (val == null)
- val = cctx.cacheObjects().toCacheObject(cctx.cacheObjectContext(), swapEntry.type(),
- swapEntry.valueBytes());
-
assert val != null;
qryMgr.remove(key, val);
http://git-wip-us.apache.org/repos/asf/ignite/blob/72c3eef2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapEntryImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapEntryImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapEntryImpl.java
index b7c66d3..6b1266f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapEntryImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapEntryImpl.java
@@ -94,8 +94,6 @@ public class GridCacheSwapEntryImpl implements GridCacheSwapEntry {
long expireTime,
@Nullable IgniteUuid keyClsLdrId,
@Nullable IgniteUuid valClsLdrId) {
- assert ver != null;
-
this.valBytes = valBytes;
this.type = type;
this.ver = ver;
@@ -268,9 +266,36 @@ public class GridCacheSwapEntryImpl implements GridCacheSwapEntry {
/**
* @param arr Entry bytes.
+ * @param valOnly If {@code true} unmarshalls only entry value.
* @return Entry.
*/
- public static GridCacheSwapEntryImpl unmarshal(byte[] arr) {
+ public static GridCacheSwapEntryImpl unmarshal(byte[] arr, boolean valOnly) {
+ if (valOnly) {
+ long off = BYTE_ARR_OFF + VERSION_OFFSET; // Skip ttl, expire time.
+
+ boolean verEx = UNSAFE.getByte(arr, off++) != 0;
+
+ off += verEx ? VERSION_EX_SIZE : VERSION_SIZE;
+
+ int arrLen = UNSAFE.getInt(arr, off);
+
+ off += 4;
+
+ byte type = UNSAFE.getByte(arr, off++);
+
+ byte[] valBytes = new byte[arrLen];
+
+ UNSAFE.copyMemory(arr, off, valBytes, BYTE_ARR_OFF, arrLen);
+
+ return new GridCacheSwapEntryImpl(ByteBuffer.wrap(valBytes),
+ type,
+ null,
+ 0L,
+ 0L,
+ null,
+ null);
+ }
+
long off = BYTE_ARR_OFF;
long ttl = UNSAFE.getLong(arr, off);
http://git-wip-us.apache.org/repos/asf/ignite/blob/72c3eef2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java
index d9a8b5c..2ab7b5b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java
@@ -569,6 +569,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
* @param entryLocked {@code True} if cache entry is locked.
* @param readOffheap Read offheap flag.
* @param readSwap Read swap flag.
+ * @param valOnly If {@code true} unmarshals only entry value.
* @return Value from swap or {@code null}.
* @throws IgniteCheckedException If failed.
*/
@@ -578,7 +579,8 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
int part,
boolean entryLocked,
boolean readOffheap,
- boolean readSwap)
+ boolean readSwap,
+ boolean valOnly)
throws IgniteCheckedException
{
assert readOffheap || readSwap;
@@ -605,7 +607,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
cctx.cache().metrics0().onOffHeapRead(bytes != null);
if (bytes != null)
- return swapEntry(unmarshalSwapEntry(bytes));
+ return swapEntry(unmarshalSwapEntry(bytes, valOnly));
}
if (!swapEnabled || !readSwap)
@@ -620,7 +622,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
if (bytes == null && lsnr != null)
return lsnr.entry;
- return bytes != null ? swapEntry(unmarshalSwapEntry(bytes)) : null;
+ return bytes != null ? swapEntry(unmarshalSwapEntry(bytes, valOnly)) : null;
}
finally {
if (lsnr != null)
@@ -706,7 +708,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
if (rmv != null) {
try {
- GridCacheSwapEntry entry = swapEntry(unmarshalSwapEntry(rmv));
+ GridCacheSwapEntry entry = swapEntry(unmarshalSwapEntry(rmv, false));
if (entry == null)
return;
@@ -756,20 +758,22 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
* @param locked {@code True} if cache entry is locked.
* @param readOffheap Read offheap flag.
* @param readSwap Read swap flag.
+ * @param valOnly If {@code true} unmarshals only entry value.
* @return Read value.
* @throws IgniteCheckedException If read failed.
*/
@Nullable GridCacheSwapEntry read(GridCacheEntryEx entry,
boolean locked,
boolean readOffheap,
- boolean readSwap)
+ boolean readSwap,
+ boolean valOnly)
throws IgniteCheckedException
{
if (!offheapEnabled && !swapEnabled)
return null;
return read(entry.key(), entry.key().valueBytes(cctx.cacheObjectContext()), entry.partition(), locked,
- readOffheap, readSwap);
+ readOffheap, readSwap, valOnly);
}
/**
@@ -805,7 +809,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
* @return Read value.
* @throws IgniteCheckedException If read failed.
*/
- @Nullable public GridCacheSwapEntry read(KeyCacheObject key,
+ @Nullable public CacheObject readValue(KeyCacheObject key,
boolean readOffheap,
boolean readSwap)
throws IgniteCheckedException
@@ -815,7 +819,17 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
int part = cctx.affinity().partition(key);
- return read(key, key.valueBytes(cctx.cacheObjectContext()), part, false, readOffheap, readSwap);
+ GridCacheSwapEntry swapEntry = read(key,
+ key.valueBytes(cctx.cacheObjectContext()),
+ part,
+ false,
+ readOffheap,
+ readSwap,
+ true);
+
+ assert swapEntry == null || swapEntry.value() != null : swapEntry;
+
+ return swapEntry != null ? swapEntry.value() : null;
}
/**
@@ -865,7 +879,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
cctx.cache().metrics0().onOffHeapRemove();
}
- entry = entryBytes == null ? null : swapEntry(unmarshalSwapEntry(entryBytes));
+ entry = entryBytes == null ? null : swapEntry(unmarshalSwapEntry(entryBytes, false));
}
return entry;
@@ -972,7 +986,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
if (rmv != null) {
try {
- GridCacheSwapEntry entry = swapEntry(unmarshalSwapEntry(rmv));
+ GridCacheSwapEntry entry = swapEntry(unmarshalSwapEntry(rmv, false));
if (entry == null)
return;
@@ -1078,7 +1092,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
Collection<GridCacheSwapListener> lsnrs = offheapLsnrs.get(part);
if (lsnrs != null) {
- GridCacheSwapEntry e = swapEntry(GridCacheSwapEntryImpl.unmarshal(entry));
+ GridCacheSwapEntry e = swapEntry(GridCacheSwapEntryImpl.unmarshal(entry, false));
for (GridCacheSwapListener lsnr : lsnrs)
lsnr.onEntryUnswapped(part, key, e);
@@ -1132,7 +1146,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
cctx.cache().metrics0().onOffHeapRead(entryBytes != null);
if (entryBytes != null) {
- GridCacheSwapEntry entry = swapEntry(unmarshalSwapEntry(entryBytes));
+ GridCacheSwapEntry entry = swapEntry(unmarshalSwapEntry(entryBytes, false));
if (entry != null) {
cctx.queries().onUnswap(key, entry.value());
@@ -1165,7 +1179,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
if (entryBytes == null)
return false;
- GridCacheSwapEntry entry = swapEntry(unmarshalSwapEntry(entryBytes));
+ GridCacheSwapEntry entry = swapEntry(unmarshalSwapEntry(entryBytes, true));
if (entry == null)
return false;
@@ -2063,7 +2077,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
try {
for (Map.Entry<byte[], byte[]> e : iter) {
try {
- GridCacheSwapEntry swapEntry = unmarshalSwapEntry(e.getValue());
+ GridCacheSwapEntry swapEntry = unmarshalSwapEntry(e.getValue(), false);
IgniteUuid valLdrId = swapEntry.valueClassLoaderId();
@@ -2120,10 +2134,11 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
/**
* @param bytes Bytes to unmarshal.
+ * @param valOnly If {@code true} unmarshalls only value.
* @return Unmarshalled entry.
*/
- private GridCacheSwapEntry unmarshalSwapEntry(byte[] bytes) {
- return GridCacheSwapEntryImpl.unmarshal(bytes);
+ private GridCacheSwapEntry unmarshalSwapEntry(byte[] bytes, boolean valOnly) {
+ return GridCacheSwapEntryImpl.unmarshal(bytes, valOnly);
}
/**
@@ -2169,7 +2184,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
@Override protected Map.Entry<byte[], GridCacheSwapEntry> onNext() throws IgniteCheckedException {
Map.Entry<byte[], byte[]> e = iter.nextX();
- GridCacheSwapEntry unmarshalled = unmarshalSwapEntry(e.getValue());
+ GridCacheSwapEntry unmarshalled = unmarshalSwapEntry(e.getValue(), false);
return F.t(e.getKey(), swapEntry(unmarshalled));
}
@@ -2446,6 +2461,9 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
abstract protected GridCloseableIterator<T1> partitionIterator(int part) throws IgniteCheckedException;
}
+ /**
+ *
+ */
private class GridVersionedMapEntry<K,V> implements Map.Entry<K,V>, GridCacheVersionAware {
/** */
private Map.Entry<byte[], byte[]> entry;
@@ -2474,7 +2492,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
/** {@inheritDoc} */
@Override public V getValue() {
try {
- GridCacheSwapEntry e = unmarshalSwapEntry(entry.getValue());
+ GridCacheSwapEntry e = unmarshalSwapEntry(entry.getValue(), false);
swapEntry(e);
@@ -2487,7 +2505,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
/** {@inheritDoc} */
@Override public GridCacheVersion version() {
- GridCacheSwapEntry e = unmarshalSwapEntry(entry.getValue());
+ GridCacheSwapEntry e = unmarshalSwapEntry(entry.getValue(), false);
return e.version();
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/72c3eef2/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
index 2af1386..8595187 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
@@ -71,7 +71,6 @@ import org.apache.ignite.internal.processors.cache.CacheObjectContext;
import org.apache.ignite.internal.processors.cache.GridCacheAdapter;
import org.apache.ignite.internal.processors.cache.GridCacheAffinityManager;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
-import org.apache.ignite.internal.processors.cache.GridCacheSwapEntry;
import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
import org.apache.ignite.internal.processors.cache.QueryCursorImpl;
import org.apache.ignite.internal.processors.cache.query.GridCacheTwoStepQuery;
@@ -2108,6 +2107,9 @@ public class IgniteH2Indexing implements GridQueryIndexing {
/** */
private final GridUnsafeGuard guard;
+ /** */
+ private final boolean preferSwapVal;
+
/**
* @param type Type descriptor.
* @param schema Schema.
@@ -2136,6 +2138,8 @@ public class IgniteH2Indexing implements GridQueryIndexing {
keyType = DataType.getTypeFromClass(type.keyClass());
valType = DataType.getTypeFromClass(type.valueClass());
+
+ preferSwapVal = schema.ccfg.getMemoryMode() == CacheMemoryMode.OFFHEAP_TIERED;
}
/** {@inheritDoc} */
@@ -2263,15 +2267,11 @@ public class IgniteH2Indexing implements GridQueryIndexing {
if (cctx.isNear())
cctx = cctx.near().dht().context();
- GridCacheSwapEntry e = cctx.swap().read(cctx.toCacheKeyObject(key), true, true);
+ CacheObject v = cctx.swap().readValue(cctx.toCacheKeyObject(key), true, true);
- if (e == null)
+ if (v == null)
return null;
- CacheObject v = e.value();
-
- assert v != null : "swap must unmarshall it for us";
-
return v.value(cctx.cacheObjectContext(), false);
}
@@ -2312,5 +2312,10 @@ public class IgniteH2Indexing implements GridQueryIndexing {
return new GridH2KeyValueRowOffheap(this, ptr);
}
+
+ /** {@inheritDoc} */
+ @Override public boolean preferSwapValue() {
+ return preferSwapVal;
+ }
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/72c3eef2/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2AbstractKeyValueRow.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2AbstractKeyValueRow.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2AbstractKeyValueRow.java
index 4a16284..c11f541 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2AbstractKeyValueRow.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2AbstractKeyValueRow.java
@@ -130,20 +130,23 @@ public abstract class GridH2AbstractKeyValueRow extends GridH2Row {
/**
* Atomically updates weak value.
*
- * @param upd New value.
- * @return {@code null} If update succeeded, unexpected value otherwise.
+ * @param valObj New value.
+ * @return New value if old value is empty, old value otherwise.
+ * @throws IgniteCheckedException If failed.
*/
- protected synchronized Value updateWeakValue(Value upd) {
+ protected synchronized Value updateWeakValue(Object valObj) throws IgniteCheckedException {
Value res = peekValue(VAL_COL);
if (res != null && !(res instanceof WeakValue))
return res;
+ Value upd = desc.wrap(valObj, desc.valueType());
+
setValue(VAL_COL, new WeakValue(upd));
notifyAll();
- return null;
+ return upd;
}
/**
@@ -188,21 +191,23 @@ public abstract class GridH2AbstractKeyValueRow extends GridH2Row {
Value v;
if (col == VAL_COL) {
- v = syncValue(0);
+ v = peekValue(VAL_COL);
long start = 0;
int attempt = 0;
while ((v = WeakValue.unwrap(v)) == null) {
- v = getOffheapValue(VAL_COL);
+ if (!desc.preferSwapValue()) {
+ v = getOffheapValue(VAL_COL);
- if (v != null) {
- setValue(VAL_COL, v);
+ if (v != null) {
+ setValue(VAL_COL, v);
- if (peekValue(KEY_COL) == null)
- cache();
+ if (peekValue(KEY_COL) == null)
+ cache();
- return v;
+ return v;
+ }
}
Object k = getValue(KEY_COL).getObject();
@@ -213,16 +218,24 @@ public abstract class GridH2AbstractKeyValueRow extends GridH2Row {
if (valObj != null) {
// Even if we've found valObj in swap, it is may be some new value,
// while the needed value was already unswapped, so we have to recheck it.
- if ((v = WeakValue.unwrap(syncValue(0))) == null && (v = getOffheapValue(VAL_COL)) == null) {
- Value upd = desc.wrap(valObj, desc.valueType());
-
- v = updateWeakValue(upd);
-
- return v == null ? upd : v;
- }
+ if ((v = getOffheapValue(VAL_COL)) == null)
+ return updateWeakValue(valObj);
}
else {
// If nothing found in swap then we should be already unswapped.
+ if (desc.preferSwapValue()) {
+ v = getOffheapValue(VAL_COL);
+
+ if (v != null) {
+ setValue(VAL_COL, v);
+
+ if (peekValue(KEY_COL) == null)
+ cache();
+
+ return v;
+ }
+ }
+
v = syncValue(attempt);
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/72c3eef2/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2KeyValueRowOffheap.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2KeyValueRowOffheap.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2KeyValueRowOffheap.java
index de31fe1..2dd9f25 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2KeyValueRowOffheap.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2KeyValueRowOffheap.java
@@ -216,12 +216,19 @@ public class GridH2KeyValueRowOffheap extends GridH2AbstractKeyValueRow {
/** {@inheritDoc} */
@SuppressWarnings("NonSynchronizedMethodOverridesSynchronizedMethod")
- @Override protected synchronized Value updateWeakValue(Value upd) {
+ @Override protected synchronized Value updateWeakValue(Object valObj) throws IgniteCheckedException {
+ Value val = peekValue(VAL_COL);
+
+ if (val != null)
+ return val;
+
+ Value upd = desc.wrap(valObj, desc.valueType());
+
setValue(VAL_COL, upd);
notifyAll();
- return null;
+ return upd;
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/72c3eef2/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2RowDescriptor.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2RowDescriptor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2RowDescriptor.java
index 0edd102..ed3ff7a 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2RowDescriptor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2RowDescriptor.java
@@ -111,4 +111,9 @@ public interface GridH2RowDescriptor extends GridOffHeapSmartPointerFactory<Grid
* @throws IgniteCheckedException If failed.
*/
public Value wrap(Object o, int type) throws IgniteCheckedException;
+
+ /**
+ * @return {@code True} if should check swap value before offheap.
+ */
+ public boolean preferSwapValue();
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/72c3eef2/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheIndexStreamerTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheIndexStreamerTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheIndexStreamerTest.java
index 23f4e91..e6bf22b 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheIndexStreamerTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheIndexStreamerTest.java
@@ -23,7 +23,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteDataStreamer;
-import org.apache.ignite.cache.CacheMemoryMode;
+import org.apache.ignite.cache.CacheAtomicityMode;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.IgniteInternalFuture;
@@ -36,6 +36,8 @@ import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.cache.CacheMemoryMode.OFFHEAP_TIERED;
import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
/**
@@ -45,7 +47,6 @@ public class CacheIndexStreamerTest extends GridCommonAbstractTest {
/** */
private final TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
-
/** {@inheritDoc} */
@Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
IgniteConfiguration cfg = super.getConfiguration(gridName);
@@ -60,14 +61,29 @@ public class CacheIndexStreamerTest extends GridCommonAbstractTest {
/**
* @throws Exception If failed.
*/
- public void testStreamer() throws Exception {
+ public void testStreamerAtomic() throws Exception {
+ checkStreamer(ATOMIC);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testStreamerTx() throws Exception {
+ checkStreamer(TRANSACTIONAL);
+ }
+
+ /**
+ * @param atomicityMode Cache atomicity mode.
+ * @throws Exception If failed.
+ */
+ public void checkStreamer(CacheAtomicityMode atomicityMode) throws Exception {
final Ignite ignite = startGrid(0);
- final IgniteCache<Integer, String> cache = ignite.createCache(cacheConfiguration());
+ final IgniteCache<Integer, String> cache = ignite.createCache(cacheConfiguration(atomicityMode));
final AtomicBoolean stop = new AtomicBoolean();
- final int KEYS= 10_000;
+ final int KEYS = 10_000;
try {
IgniteInternalFuture streamerFut = GridTestUtils.runAsync(new Callable() {
@@ -118,14 +134,15 @@ public class CacheIndexStreamerTest extends GridCommonAbstractTest {
}
/**
+ * @param atomicityMode Cache atomicity mode.
* @return Cache configuration.
*/
- private CacheConfiguration cacheConfiguration() {
+ private CacheConfiguration cacheConfiguration(CacheAtomicityMode atomicityMode) {
CacheConfiguration ccfg = new CacheConfiguration();
- ccfg.setAtomicityMode(ATOMIC);
+ ccfg.setAtomicityMode(atomicityMode);
ccfg.setWriteSynchronizationMode(FULL_SYNC);
- ccfg.setMemoryMode(CacheMemoryMode.OFFHEAP_TIERED);
+ ccfg.setMemoryMode(OFFHEAP_TIERED);
ccfg.setOffHeapMaxMemory(0);
ccfg.setBackups(1);
ccfg.setIndexedTypes(Integer.class, String.class);
http://git-wip-us.apache.org/repos/asf/ignite/blob/72c3eef2/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheSwapSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheSwapSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheSwapSelfTest.java
index e0e6ff0..cd1fc93 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheSwapSelfTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheSwapSelfTest.java
@@ -244,12 +244,12 @@ public class GridCacheSwapSelfTest extends GridCommonAbstractTest {
}
/**
- * TODO: IGNITE-599.
- *
* @throws Exception If failed.
*/
public void testSwapEviction() throws Exception {
try {
+ fail("https://issues.apache.org/jira/browse/IGNITE-599");
+
final CountDownLatch evicted = new CountDownLatch(10);
startGrids(1);
http://git-wip-us.apache.org/repos/asf/ignite/blob/72c3eef2/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheWithIndexingTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheWithIndexingTestSuite.java b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheWithIndexingTestSuite.java
index f30f70e..550c69f 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheWithIndexingTestSuite.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheWithIndexingTestSuite.java
@@ -19,6 +19,7 @@ package org.apache.ignite.testsuites;
import junit.framework.TestSuite;
import org.apache.ignite.internal.processors.cache.CacheConfigurationP2PTest;
+import org.apache.ignite.internal.processors.cache.CacheIndexStreamerTest;
import org.apache.ignite.internal.processors.cache.GridCacheOffHeapAndSwapSelfTest;
import org.apache.ignite.internal.processors.cache.GridCacheOffHeapSelfTest;
import org.apache.ignite.internal.processors.cache.GridCacheOffheapIndexEntryEvictTest;
@@ -63,6 +64,7 @@ public class IgniteCacheWithIndexingTestSuite extends TestSuite {
suite.addTestSuite(GridCacheOffheapIndexGetSelfTest.class);
suite.addTestSuite(GridCacheOffheapIndexEntryEvictTest.class);
+ suite.addTestSuite(CacheIndexStreamerTest.class);
suite.addTestSuite(CacheConfigurationP2PTest.class);