You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ag...@apache.org on 2015/09/23 02:57:26 UTC

[10/14] ignite git commit: ignite-1516 Optimize GridH2AbstractKeyValueRow.getValue

ignite-1516 Optimize GridH2AbstractKeyValueRow.getValue


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/72c3eef2
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/72c3eef2
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/72c3eef2

Branch: refs/heads/ignite-1171-debug
Commit: 72c3eef2aa31df4a68b46a8877809cc0f49c1368
Parents: 39dace4
Author: sboikov <sb...@gridgain.com>
Authored: Tue Sep 22 13:51:09 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Tue Sep 22 13:51:09 2015 +0300

----------------------------------------------------------------------
 .../processors/cache/GridCacheAdapter.java      |  8 +--
 .../processors/cache/GridCacheMapEntry.java     | 14 ++---
 .../processors/cache/GridCacheProcessor.java    |  6 +--
 .../cache/GridCacheSwapEntryImpl.java           | 31 +++++++++--
 .../processors/cache/GridCacheSwapManager.java  | 56 +++++++++++++-------
 .../processors/query/h2/IgniteH2Indexing.java   | 19 ++++---
 .../query/h2/opt/GridH2AbstractKeyValueRow.java | 49 ++++++++++-------
 .../query/h2/opt/GridH2KeyValueRowOffheap.java  | 11 +++-
 .../query/h2/opt/GridH2RowDescriptor.java       |  5 ++
 .../cache/CacheIndexStreamerTest.java           | 33 +++++++++---
 .../processors/cache/GridCacheSwapSelfTest.java |  4 +-
 .../IgniteCacheWithIndexingTestSuite.java       |  2 +
 12 files changed, 158 insertions(+), 80 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/72c3eef2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
index 1fc94ec..ae987b7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
@@ -805,9 +805,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
                 if (modes.offheap || modes.swap) {
                     GridCacheSwapManager swapMgr = ctx.isNear() ? ctx.near().dht().context().swap() : ctx.swap();
 
-                    GridCacheSwapEntry swapEntry = swapMgr.read(cacheKey, modes.offheap, modes.swap);
-
-                    cacheVal = swapEntry != null ? swapEntry.value() : null;
+                    cacheVal = swapMgr.readValue(cacheKey, modes.offheap, modes.swap);
                 }
             }
             else
@@ -856,9 +854,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
         if (offheap || swap) {
             GridCacheSwapManager swapMgr = ctx.isNear() ? ctx.near().dht().context().swap() : ctx.swap();
 
-            GridCacheSwapEntry swapEntry = swapMgr.read(key, offheap, swap);
-
-            return swapEntry != null ? swapEntry.value() : null;
+            return swapMgr.readValue(key, offheap, swap);
         }
 
         return null;

http://git-wip-us.apache.org/repos/asf/ignite/blob/72c3eef2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
index 961c792..4bf0aa1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
@@ -512,7 +512,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
                     }
                 }
                 else
-                    e = detached() ? cctx.swap().read(this, true, true, true) : cctx.swap().readAndRemove(this);
+                    e = detached() ? cctx.swap().read(this, true, true, true, false) : cctx.swap().readAndRemove(this);
 
                 if (log.isDebugEnabled())
                     log.debug("Read swap entry [swapEntry=" + e + ", cacheEntry=" + this + ']');
@@ -2840,7 +2840,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
             }
 
             if (offheap || swap) {
-                GridCacheSwapEntry e = cctx.swap().read(this, false, offheap, swap);
+                GridCacheSwapEntry e = cctx.swap().read(this, false, offheap, swap, true);
 
                 return e != null ? e.value() : null;
             }
@@ -3581,14 +3581,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
 
         CacheObject val = rawGetOrUnmarshalUnlocked(false);
 
-        if (val == null) {
-            GridCacheSwapEntry swapEntry = cctx.swap().read(key, true, true);
-
-            if (swapEntry == null)
-                return null;
-
-            return swapEntry.value();
-        }
+        if (val == null)
+            val = cctx.swap().readValue(key, true, true);
 
         return val;
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/72c3eef2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index 9c325aa..e92ea57 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -2763,14 +2763,10 @@ public class GridCacheProcessor extends GridProcessorAdapter {
                 try {
                     KeyCacheObject key = cctx.toCacheKeyObject(keyBytes);
 
-                    GridCacheSwapEntry swapEntry = GridCacheSwapEntryImpl.unmarshal(valBytes);
+                    GridCacheSwapEntry swapEntry = GridCacheSwapEntryImpl.unmarshal(valBytes, true);
 
                     CacheObject val = swapEntry.value();
 
-                    if (val == null)
-                        val = cctx.cacheObjects().toCacheObject(cctx.cacheObjectContext(), swapEntry.type(),
-                            swapEntry.valueBytes());
-
                     assert val != null;
 
                     qryMgr.remove(key, val);

http://git-wip-us.apache.org/repos/asf/ignite/blob/72c3eef2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapEntryImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapEntryImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapEntryImpl.java
index b7c66d3..6b1266f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapEntryImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapEntryImpl.java
@@ -94,8 +94,6 @@ public class GridCacheSwapEntryImpl implements GridCacheSwapEntry {
         long expireTime,
         @Nullable IgniteUuid keyClsLdrId,
         @Nullable IgniteUuid valClsLdrId) {
-        assert ver != null;
-
         this.valBytes = valBytes;
         this.type = type;
         this.ver = ver;
@@ -268,9 +266,36 @@ public class GridCacheSwapEntryImpl implements GridCacheSwapEntry {
 
     /**
      * @param arr Entry bytes.
+     * @param valOnly If {@code true} unmarshalls only entry value.
      * @return Entry.
      */
-    public static GridCacheSwapEntryImpl unmarshal(byte[] arr) {
+    public static GridCacheSwapEntryImpl unmarshal(byte[] arr, boolean valOnly) {
+        if (valOnly) {
+            long off = BYTE_ARR_OFF + VERSION_OFFSET; // Skip ttl, expire time.
+
+            boolean verEx = UNSAFE.getByte(arr, off++) != 0;
+
+            off += verEx ? VERSION_EX_SIZE : VERSION_SIZE;
+
+            int arrLen = UNSAFE.getInt(arr, off);
+
+            off += 4;
+
+            byte type = UNSAFE.getByte(arr, off++);
+
+            byte[] valBytes = new byte[arrLen];
+
+            UNSAFE.copyMemory(arr, off, valBytes, BYTE_ARR_OFF, arrLen);
+
+            return new GridCacheSwapEntryImpl(ByteBuffer.wrap(valBytes),
+                type,
+                null,
+                0L,
+                0L,
+                null,
+                null);
+        }
+
         long off = BYTE_ARR_OFF;
 
         long ttl = UNSAFE.getLong(arr, off);

http://git-wip-us.apache.org/repos/asf/ignite/blob/72c3eef2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java
index d9a8b5c..2ab7b5b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java
@@ -569,6 +569,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
      * @param entryLocked {@code True} if cache entry is locked.
      * @param readOffheap Read offheap flag.
      * @param readSwap Read swap flag.
+     * @param valOnly If {@code true} unmarshals only entry value.
      * @return Value from swap or {@code null}.
      * @throws IgniteCheckedException If failed.
      */
@@ -578,7 +579,8 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
         int part,
         boolean entryLocked,
         boolean readOffheap,
-        boolean readSwap)
+        boolean readSwap,
+        boolean valOnly)
         throws IgniteCheckedException
     {
         assert readOffheap || readSwap;
@@ -605,7 +607,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
                     cctx.cache().metrics0().onOffHeapRead(bytes != null);
 
                 if (bytes != null)
-                    return swapEntry(unmarshalSwapEntry(bytes));
+                    return swapEntry(unmarshalSwapEntry(bytes, valOnly));
             }
 
             if (!swapEnabled || !readSwap)
@@ -620,7 +622,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
             if (bytes == null && lsnr != null)
                 return lsnr.entry;
 
-            return bytes != null ? swapEntry(unmarshalSwapEntry(bytes)) : null;
+            return bytes != null ? swapEntry(unmarshalSwapEntry(bytes, valOnly)) : null;
         }
         finally {
             if (lsnr != null)
@@ -706,7 +708,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
 
                 if (rmv != null) {
                     try {
-                        GridCacheSwapEntry entry = swapEntry(unmarshalSwapEntry(rmv));
+                        GridCacheSwapEntry entry = swapEntry(unmarshalSwapEntry(rmv, false));
 
                         if (entry == null)
                             return;
@@ -756,20 +758,22 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
      * @param locked {@code True} if cache entry is locked.
      * @param readOffheap Read offheap flag.
      * @param readSwap Read swap flag.
+     * @param valOnly If {@code true} unmarshals only entry value.
      * @return Read value.
      * @throws IgniteCheckedException If read failed.
      */
     @Nullable GridCacheSwapEntry read(GridCacheEntryEx entry,
         boolean locked,
         boolean readOffheap,
-        boolean readSwap)
+        boolean readSwap,
+        boolean valOnly)
         throws IgniteCheckedException
     {
         if (!offheapEnabled && !swapEnabled)
             return null;
 
         return read(entry.key(), entry.key().valueBytes(cctx.cacheObjectContext()), entry.partition(), locked,
-            readOffheap, readSwap);
+            readOffheap, readSwap, valOnly);
     }
 
     /**
@@ -805,7 +809,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
      * @return Read value.
      * @throws IgniteCheckedException If read failed.
      */
-    @Nullable public GridCacheSwapEntry read(KeyCacheObject key,
+    @Nullable public CacheObject readValue(KeyCacheObject key,
         boolean readOffheap,
         boolean readSwap)
         throws IgniteCheckedException
@@ -815,7 +819,17 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
 
         int part = cctx.affinity().partition(key);
 
-        return read(key, key.valueBytes(cctx.cacheObjectContext()), part, false, readOffheap, readSwap);
+        GridCacheSwapEntry swapEntry = read(key,
+            key.valueBytes(cctx.cacheObjectContext()),
+            part,
+            false,
+            readOffheap,
+            readSwap,
+            true);
+
+        assert swapEntry == null || swapEntry.value() != null : swapEntry;
+
+        return swapEntry != null ? swapEntry.value() : null;
     }
 
     /**
@@ -865,7 +879,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
                     cctx.cache().metrics0().onOffHeapRemove();
             }
 
-            entry = entryBytes == null ? null : swapEntry(unmarshalSwapEntry(entryBytes));
+            entry = entryBytes == null ? null : swapEntry(unmarshalSwapEntry(entryBytes, false));
         }
 
         return entry;
@@ -972,7 +986,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
 
                     if (rmv != null) {
                         try {
-                            GridCacheSwapEntry entry = swapEntry(unmarshalSwapEntry(rmv));
+                            GridCacheSwapEntry entry = swapEntry(unmarshalSwapEntry(rmv, false));
 
                             if (entry == null)
                                 return;
@@ -1078,7 +1092,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
             Collection<GridCacheSwapListener> lsnrs = offheapLsnrs.get(part);
 
             if (lsnrs != null) {
-                GridCacheSwapEntry e = swapEntry(GridCacheSwapEntryImpl.unmarshal(entry));
+                GridCacheSwapEntry e = swapEntry(GridCacheSwapEntryImpl.unmarshal(entry, false));
 
                 for (GridCacheSwapListener lsnr : lsnrs)
                     lsnr.onEntryUnswapped(part, key, e);
@@ -1132,7 +1146,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
             cctx.cache().metrics0().onOffHeapRead(entryBytes != null);
 
         if (entryBytes != null) {
-            GridCacheSwapEntry entry = swapEntry(unmarshalSwapEntry(entryBytes));
+            GridCacheSwapEntry entry = swapEntry(unmarshalSwapEntry(entryBytes, false));
 
             if (entry != null) {
                 cctx.queries().onUnswap(key, entry.value());
@@ -1165,7 +1179,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
         if (entryBytes == null)
             return false;
 
-        GridCacheSwapEntry entry = swapEntry(unmarshalSwapEntry(entryBytes));
+        GridCacheSwapEntry entry = swapEntry(unmarshalSwapEntry(entryBytes, true));
 
         if (entry == null)
             return false;
@@ -2063,7 +2077,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
                 try {
                     for (Map.Entry<byte[], byte[]> e : iter) {
                         try {
-                            GridCacheSwapEntry swapEntry = unmarshalSwapEntry(e.getValue());
+                            GridCacheSwapEntry swapEntry = unmarshalSwapEntry(e.getValue(), false);
 
                             IgniteUuid valLdrId = swapEntry.valueClassLoaderId();
 
@@ -2120,10 +2134,11 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
 
     /**
      * @param bytes Bytes to unmarshal.
+     * @param valOnly If {@code true} unmarshalls only value.
      * @return Unmarshalled entry.
      */
-    private GridCacheSwapEntry unmarshalSwapEntry(byte[] bytes) {
-        return GridCacheSwapEntryImpl.unmarshal(bytes);
+    private GridCacheSwapEntry unmarshalSwapEntry(byte[] bytes, boolean valOnly) {
+        return GridCacheSwapEntryImpl.unmarshal(bytes, valOnly);
     }
 
     /**
@@ -2169,7 +2184,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
         @Override protected Map.Entry<byte[], GridCacheSwapEntry> onNext() throws IgniteCheckedException {
             Map.Entry<byte[], byte[]> e = iter.nextX();
 
-            GridCacheSwapEntry unmarshalled = unmarshalSwapEntry(e.getValue());
+            GridCacheSwapEntry unmarshalled = unmarshalSwapEntry(e.getValue(), false);
 
             return F.t(e.getKey(), swapEntry(unmarshalled));
         }
@@ -2446,6 +2461,9 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
         abstract protected GridCloseableIterator<T1> partitionIterator(int part) throws IgniteCheckedException;
     }
 
+    /**
+     *
+     */
     private class GridVersionedMapEntry<K,V> implements Map.Entry<K,V>, GridCacheVersionAware {
         /** */
         private Map.Entry<byte[], byte[]> entry;
@@ -2474,7 +2492,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
         /** {@inheritDoc} */
         @Override public V getValue() {
             try {
-                GridCacheSwapEntry e = unmarshalSwapEntry(entry.getValue());
+                GridCacheSwapEntry e = unmarshalSwapEntry(entry.getValue(), false);
 
                 swapEntry(e);
 
@@ -2487,7 +2505,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
 
         /** {@inheritDoc} */
         @Override public GridCacheVersion version() {
-            GridCacheSwapEntry e = unmarshalSwapEntry(entry.getValue());
+            GridCacheSwapEntry e = unmarshalSwapEntry(entry.getValue(), false);
 
             return e.version();
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/72c3eef2/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
index 2af1386..8595187 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
@@ -71,7 +71,6 @@ import org.apache.ignite.internal.processors.cache.CacheObjectContext;
 import org.apache.ignite.internal.processors.cache.GridCacheAdapter;
 import org.apache.ignite.internal.processors.cache.GridCacheAffinityManager;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
-import org.apache.ignite.internal.processors.cache.GridCacheSwapEntry;
 import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
 import org.apache.ignite.internal.processors.cache.QueryCursorImpl;
 import org.apache.ignite.internal.processors.cache.query.GridCacheTwoStepQuery;
@@ -2108,6 +2107,9 @@ public class IgniteH2Indexing implements GridQueryIndexing {
         /** */
         private final GridUnsafeGuard guard;
 
+        /** */
+        private final boolean preferSwapVal;
+
         /**
          * @param type Type descriptor.
          * @param schema Schema.
@@ -2136,6 +2138,8 @@ public class IgniteH2Indexing implements GridQueryIndexing {
 
             keyType = DataType.getTypeFromClass(type.keyClass());
             valType = DataType.getTypeFromClass(type.valueClass());
+
+            preferSwapVal = schema.ccfg.getMemoryMode() == CacheMemoryMode.OFFHEAP_TIERED;
         }
 
         /** {@inheritDoc} */
@@ -2263,15 +2267,11 @@ public class IgniteH2Indexing implements GridQueryIndexing {
             if (cctx.isNear())
                 cctx = cctx.near().dht().context();
 
-            GridCacheSwapEntry e = cctx.swap().read(cctx.toCacheKeyObject(key), true, true);
+            CacheObject v = cctx.swap().readValue(cctx.toCacheKeyObject(key), true, true);
 
-            if (e == null)
+            if (v == null)
                 return null;
 
-            CacheObject v = e.value();
-
-            assert v != null : "swap must unmarshall it for us";
-
             return v.value(cctx.cacheObjectContext(), false);
         }
 
@@ -2312,5 +2312,10 @@ public class IgniteH2Indexing implements GridQueryIndexing {
 
             return new GridH2KeyValueRowOffheap(this, ptr);
         }
+
+        /** {@inheritDoc} */
+        @Override public boolean preferSwapValue() {
+            return preferSwapVal;
+        }
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/72c3eef2/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2AbstractKeyValueRow.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2AbstractKeyValueRow.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2AbstractKeyValueRow.java
index 4a16284..c11f541 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2AbstractKeyValueRow.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2AbstractKeyValueRow.java
@@ -130,20 +130,23 @@ public abstract class GridH2AbstractKeyValueRow extends GridH2Row {
     /**
      * Atomically updates weak value.
      *
-     * @param upd New value.
-     * @return {@code null} If update succeeded, unexpected value otherwise.
+     * @param valObj New value.
+     * @return New value if old value is empty, old value otherwise.
+     * @throws IgniteCheckedException If failed.
      */
-    protected synchronized Value updateWeakValue(Value upd) {
+    protected synchronized Value updateWeakValue(Object valObj) throws IgniteCheckedException {
         Value res = peekValue(VAL_COL);
 
         if (res != null && !(res instanceof WeakValue))
             return res;
 
+        Value upd = desc.wrap(valObj, desc.valueType());
+
         setValue(VAL_COL, new WeakValue(upd));
 
         notifyAll();
 
-        return null;
+        return upd;
     }
 
     /**
@@ -188,21 +191,23 @@ public abstract class GridH2AbstractKeyValueRow extends GridH2Row {
             Value v;
 
             if (col == VAL_COL) {
-                v = syncValue(0);
+                v = peekValue(VAL_COL);
 
                 long start = 0;
                 int attempt = 0;
 
                 while ((v = WeakValue.unwrap(v)) == null) {
-                    v = getOffheapValue(VAL_COL);
+                    if (!desc.preferSwapValue()) {
+                        v = getOffheapValue(VAL_COL);
 
-                    if (v != null) {
-                        setValue(VAL_COL, v);
+                        if (v != null) {
+                            setValue(VAL_COL, v);
 
-                        if (peekValue(KEY_COL) == null)
-                            cache();
+                            if (peekValue(KEY_COL) == null)
+                                cache();
 
-                        return v;
+                            return v;
+                        }
                     }
 
                     Object k = getValue(KEY_COL).getObject();
@@ -213,16 +218,24 @@ public abstract class GridH2AbstractKeyValueRow extends GridH2Row {
                         if (valObj != null) {
                             // Even if we've found valObj in swap, it is may be some new value,
                             // while the needed value was already unswapped, so we have to recheck it.
-                            if ((v = WeakValue.unwrap(syncValue(0))) == null && (v = getOffheapValue(VAL_COL)) == null) {
-                                Value upd = desc.wrap(valObj, desc.valueType());
-
-                                v = updateWeakValue(upd);
-
-                                return v == null ? upd : v;
-                            }
+                            if ((v = getOffheapValue(VAL_COL)) == null)
+                                return updateWeakValue(valObj);
                         }
                         else {
                             // If nothing found in swap then we should be already unswapped.
+                            if (desc.preferSwapValue()) {
+                                v = getOffheapValue(VAL_COL);
+
+                                if (v != null) {
+                                    setValue(VAL_COL, v);
+
+                                    if (peekValue(KEY_COL) == null)
+                                        cache();
+
+                                    return v;
+                                }
+                            }
+
                             v = syncValue(attempt);
                         }
                     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/72c3eef2/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2KeyValueRowOffheap.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2KeyValueRowOffheap.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2KeyValueRowOffheap.java
index de31fe1..2dd9f25 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2KeyValueRowOffheap.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2KeyValueRowOffheap.java
@@ -216,12 +216,19 @@ public class GridH2KeyValueRowOffheap extends GridH2AbstractKeyValueRow {
 
     /** {@inheritDoc} */
     @SuppressWarnings("NonSynchronizedMethodOverridesSynchronizedMethod")
-    @Override protected synchronized Value updateWeakValue(Value upd) {
+    @Override protected synchronized Value updateWeakValue(Object valObj) throws IgniteCheckedException {
+        Value val = peekValue(VAL_COL);
+
+        if (val != null)
+            return val;
+
+        Value upd = desc.wrap(valObj, desc.valueType());
+
         setValue(VAL_COL, upd);
 
         notifyAll();
 
-        return null;
+        return upd;
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/72c3eef2/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2RowDescriptor.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2RowDescriptor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2RowDescriptor.java
index 0edd102..ed3ff7a 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2RowDescriptor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2RowDescriptor.java
@@ -111,4 +111,9 @@ public interface GridH2RowDescriptor extends GridOffHeapSmartPointerFactory<Grid
      * @throws IgniteCheckedException If failed.
      */
     public Value wrap(Object o, int type) throws IgniteCheckedException;
+
+    /**
+     * @return {@code True} if should check swap value before offheap.
+     */
+    public boolean preferSwapValue();
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/72c3eef2/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheIndexStreamerTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheIndexStreamerTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheIndexStreamerTest.java
index 23f4e91..e6bf22b 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheIndexStreamerTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheIndexStreamerTest.java
@@ -23,7 +23,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCache;
 import org.apache.ignite.IgniteDataStreamer;
-import org.apache.ignite.cache.CacheMemoryMode;
+import org.apache.ignite.cache.CacheAtomicityMode;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.internal.IgniteInternalFuture;
@@ -36,6 +36,8 @@ import org.apache.ignite.testframework.GridTestUtils;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
 
 import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.cache.CacheMemoryMode.OFFHEAP_TIERED;
 import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
 
 /**
@@ -45,7 +47,6 @@ public class CacheIndexStreamerTest extends GridCommonAbstractTest {
     /** */
     private final TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
 
-
     /** {@inheritDoc} */
     @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
         IgniteConfiguration cfg = super.getConfiguration(gridName);
@@ -60,14 +61,29 @@ public class CacheIndexStreamerTest extends GridCommonAbstractTest {
     /**
      * @throws Exception If failed.
      */
-    public void testStreamer() throws Exception {
+    public void testStreamerAtomic() throws Exception {
+        checkStreamer(ATOMIC);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testStreamerTx() throws Exception {
+        checkStreamer(TRANSACTIONAL);
+    }
+
+    /**
+     * @param atomicityMode Cache atomicity mode.
+     * @throws Exception If failed.
+     */
+    public void checkStreamer(CacheAtomicityMode atomicityMode) throws Exception {
         final Ignite ignite = startGrid(0);
 
-        final IgniteCache<Integer, String> cache = ignite.createCache(cacheConfiguration());
+        final IgniteCache<Integer, String> cache = ignite.createCache(cacheConfiguration(atomicityMode));
 
         final AtomicBoolean stop = new AtomicBoolean();
 
-        final int KEYS= 10_000;
+        final int KEYS = 10_000;
 
         try {
             IgniteInternalFuture streamerFut = GridTestUtils.runAsync(new Callable() {
@@ -118,14 +134,15 @@ public class CacheIndexStreamerTest extends GridCommonAbstractTest {
     }
 
     /**
+     * @param atomicityMode Cache atomicity mode.
      * @return Cache configuration.
      */
-    private CacheConfiguration cacheConfiguration() {
+    private CacheConfiguration cacheConfiguration(CacheAtomicityMode atomicityMode) {
         CacheConfiguration ccfg = new CacheConfiguration();
 
-        ccfg.setAtomicityMode(ATOMIC);
+        ccfg.setAtomicityMode(atomicityMode);
         ccfg.setWriteSynchronizationMode(FULL_SYNC);
-        ccfg.setMemoryMode(CacheMemoryMode.OFFHEAP_TIERED);
+        ccfg.setMemoryMode(OFFHEAP_TIERED);
         ccfg.setOffHeapMaxMemory(0);
         ccfg.setBackups(1);
         ccfg.setIndexedTypes(Integer.class, String.class);

http://git-wip-us.apache.org/repos/asf/ignite/blob/72c3eef2/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheSwapSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheSwapSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheSwapSelfTest.java
index e0e6ff0..cd1fc93 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheSwapSelfTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheSwapSelfTest.java
@@ -244,12 +244,12 @@ public class GridCacheSwapSelfTest extends GridCommonAbstractTest {
     }
 
     /**
-     * TODO: IGNITE-599.
-     *
      * @throws Exception If failed.
      */
     public void testSwapEviction() throws Exception {
         try {
+            fail("https://issues.apache.org/jira/browse/IGNITE-599");
+
             final CountDownLatch evicted = new CountDownLatch(10);
 
             startGrids(1);

http://git-wip-us.apache.org/repos/asf/ignite/blob/72c3eef2/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheWithIndexingTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheWithIndexingTestSuite.java b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheWithIndexingTestSuite.java
index f30f70e..550c69f 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheWithIndexingTestSuite.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheWithIndexingTestSuite.java
@@ -19,6 +19,7 @@ package org.apache.ignite.testsuites;
 
 import junit.framework.TestSuite;
 import org.apache.ignite.internal.processors.cache.CacheConfigurationP2PTest;
+import org.apache.ignite.internal.processors.cache.CacheIndexStreamerTest;
 import org.apache.ignite.internal.processors.cache.GridCacheOffHeapAndSwapSelfTest;
 import org.apache.ignite.internal.processors.cache.GridCacheOffHeapSelfTest;
 import org.apache.ignite.internal.processors.cache.GridCacheOffheapIndexEntryEvictTest;
@@ -63,6 +64,7 @@ public class IgniteCacheWithIndexingTestSuite extends TestSuite {
 
         suite.addTestSuite(GridCacheOffheapIndexGetSelfTest.class);
         suite.addTestSuite(GridCacheOffheapIndexEntryEvictTest.class);
+        suite.addTestSuite(CacheIndexStreamerTest.class);
 
         suite.addTestSuite(CacheConfigurationP2PTest.class);