You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/06/10 16:11:48 UTC

[27/28] incubator-ignite git commit: ignite-545: merge from sprint-6

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/cache/eviction/fifo/FifoEvictionPolicy.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/cache/eviction/fifo/FifoEvictionPolicy.java b/modules/core/src/main/java/org/apache/ignite/cache/eviction/fifo/FifoEvictionPolicy.java
index bf8cf0d..221bc39 100644
--- a/modules/core/src/main/java/org/apache/ignite/cache/eviction/fifo/FifoEvictionPolicy.java
+++ b/modules/core/src/main/java/org/apache/ignite/cache/eviction/fifo/FifoEvictionPolicy.java
@@ -18,18 +18,28 @@
 package org.apache.ignite.cache.eviction.fifo;
 
 import org.apache.ignite.cache.eviction.*;
-import org.apache.ignite.configuration.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
+
 import org.jsr166.*;
 import org.jsr166.ConcurrentLinkedDeque8.*;
 
 import java.io.*;
 import java.util.*;
 
+import static org.apache.ignite.configuration.CacheConfiguration.*;
+
 /**
  * Eviction policy based on {@code First In First Out (FIFO)} algorithm and supports batch eviction.
  * <p>
- * The eviction starts when the cache size becomes {@code batchSize} elements greater than the maximum size.
+ * The eviction starts in the following cases:
+ * <ul>
+ *     <li>The cache size becomes {@code batchSize} elements greater than the maximum size.</li>
+ *     <li>
+ *         The size of cache entries in bytes becomes greater than the maximum memory size.
+ *         The size of cache entry calculates as sum of key size and value size.
+ *     </li>
+ * </ul>
+ * <b>Note:</b>Batch eviction is enabled only if maximum memory limit isn't set ({@code maxMemSize == 0}).
  * {@code batchSize} elements will be evicted in this case. The default {@code batchSize} value is {@code 1}.
  * <p>
  * This implementation is very efficient since it does not create any additional
@@ -41,11 +51,17 @@ public class FifoEvictionPolicy<K, V> implements EvictionPolicy<K, V>, FifoEvict
     private static final long serialVersionUID = 0L;
 
     /** Maximum size. */
-    private volatile int max = CacheConfiguration.DFLT_CACHE_SIZE;
+    private volatile int max = DFLT_CACHE_SIZE;
 
     /** Batch size. */
     private volatile int batchSize = 1;
 
+    /** Max memory size. */
+    private volatile long maxMemSize;
+
+    /** Memory size. */
+    private final LongAdder8 memSize = new LongAdder8();
+
     /** FIFO queue. */
     private final ConcurrentLinkedDeque8<EvictableEntry<K, V>> queue =
         new ConcurrentLinkedDeque8<>();
@@ -63,7 +79,7 @@ public class FifoEvictionPolicy<K, V> implements EvictionPolicy<K, V>, FifoEvict
      * @param max Maximum allowed size of cache before entry will start getting evicted.
      */
     public FifoEvictionPolicy(int max) {
-        A.ensure(max > 0, "max > 0");
+        A.ensure(max >= 0, "max >= 0");
 
         this.max = max;
     }
@@ -75,7 +91,7 @@ public class FifoEvictionPolicy<K, V> implements EvictionPolicy<K, V>, FifoEvict
      * @param batchSize Batch size.
      */
     public FifoEvictionPolicy(int max, int batchSize) {
-        A.ensure(max > 0, "max > 0");
+        A.ensure(max >= 0, "max >= 0");
         A.ensure(batchSize > 0, "batchSize > 0");
 
         this.max = max;
@@ -97,7 +113,7 @@ public class FifoEvictionPolicy<K, V> implements EvictionPolicy<K, V>, FifoEvict
      * @param max Maximum allowed size of cache before entry will start getting evicted.
      */
     @Override public void setMaxSize(int max) {
-        A.ensure(max > 0, "max > 0");
+        A.ensure(max >= 0, "max >= 0");
 
         this.max = max;
     }
@@ -119,6 +135,23 @@ public class FifoEvictionPolicy<K, V> implements EvictionPolicy<K, V>, FifoEvict
         return queue.size();
     }
 
+    /** {@inheritDoc} */
+    @Override public long getMaxMemorySize() {
+        return maxMemSize;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void setMaxMemorySize(long maxMemSize) {
+        A.ensure(maxMemSize >= 0, "maxMemSize >= 0");
+
+        this.maxMemSize = maxMemSize;
+    }
+
+    /** {@inheritDoc} */
+    @Override public long getCurrentMemorySize() {
+        return memSize.longValue();
+    }
+
     /**
      * Gets read-only view on internal {@code FIFO} queue in proper order.
      *
@@ -141,8 +174,11 @@ public class FifoEvictionPolicy<K, V> implements EvictionPolicy<K, V>, FifoEvict
         else {
             Node<EvictableEntry<K, V>> node = entry.removeMeta();
 
-            if (node != null)
+            if (node != null) {
                 queue.unlinkx(node);
+
+                memSize.add(-entry.size());
+            }
         }
     }
 
@@ -173,11 +209,18 @@ public class FifoEvictionPolicy<K, V> implements EvictionPolicy<K, V>, FifoEvict
                         return false;
                     }
 
+                    memSize.add(entry.size());
+
                     return true;
                 }
                 // If node was unlinked by concurrent shrink() call, we must repeat the whole cycle.
                 else if (!entry.removeMeta(node))
                     return false;
+                else {
+                    memSize.add(-entry.size());
+
+                    return true;
+                }
             }
         }
 
@@ -189,38 +232,74 @@ public class FifoEvictionPolicy<K, V> implements EvictionPolicy<K, V>, FifoEvict
      * Shrinks FIFO queue to maximum allowed size.
      */
     private void shrink() {
+        long maxMem = this.maxMemSize;
+
+        if (maxMem > 0) {
+            long startMemSize = memSize.longValue();
+
+            if (startMemSize >= maxMem)
+                for (long i = maxMem; i < startMemSize && memSize.longValue() > maxMem;) {
+                    int size = shrink0();
+
+                    if (size == -1)
+                        break;
+
+                    i += size;
+                }
+        }
+
         int max = this.max;
 
-        int batchSize = this.batchSize;
+        if (max > 0) {
+            int startSize = queue.sizex();
+
+            // Shrink only if queue is full.
+            if (startSize >= max + (maxMem > 0 ? 1 : this.batchSize))
+                for (int i = max; i < startSize && queue.sizex() > max; i++)
+                    if (shrink0() == -1)
+                        break;
+        }
+    }
 
-        int startSize = queue.sizex();
+    /**
+     * Tries to remove one item from queue.
+     *
+     * @return number of bytes that was free. {@code -1} if queue is empty.
+     */
+    private int shrink0() {
+        EvictableEntry<K, V> entry = queue.poll();
 
-        // Shrink only if queue is full.
-        if (startSize >= max + batchSize) {
-            for (int i = max; i < startSize && queue.sizex() > max; i++) {
-                EvictableEntry<K, V> entry = queue.poll();
+        if (entry == null)
+            return -1;
 
-                if (entry == null)
-                    break;
+        int size = 0;
 
-                Node<EvictableEntry<K, V>> meta = entry.removeMeta();
+        Node<EvictableEntry<K, V>> meta = entry.removeMeta();
 
-                if (meta != null && !entry.evict())
-                    touch(entry);
-            }
+        if (meta != null) {
+            size = entry.size();
+
+            memSize.add(-size);
+
+            if (!entry.evict())
+                touch(entry);
         }
+
+        return size;
     }
 
     /** {@inheritDoc} */
     @Override public void writeExternal(ObjectOutput out) throws IOException {
         out.writeInt(max);
         out.writeInt(batchSize);
+        out.writeLong(maxMemSize);
     }
 
     /** {@inheritDoc} */
     @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
         max = in.readInt();
         batchSize = in.readInt();
+        maxMemSize = in.readLong();
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/cache/eviction/fifo/FifoEvictionPolicyMBean.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/cache/eviction/fifo/FifoEvictionPolicyMBean.java b/modules/core/src/main/java/org/apache/ignite/cache/eviction/fifo/FifoEvictionPolicyMBean.java
index 63a413e..793aa66 100644
--- a/modules/core/src/main/java/org/apache/ignite/cache/eviction/fifo/FifoEvictionPolicyMBean.java
+++ b/modules/core/src/main/java/org/apache/ignite/cache/eviction/fifo/FifoEvictionPolicyMBean.java
@@ -63,4 +63,26 @@ public interface FifoEvictionPolicyMBean {
      */
     @MXBeanDescription("Current FIFO queue size.")
     public int getCurrentSize();
+
+    /**
+     * Gets maximum allowed cache size in bytes.
+     *
+     * @return maximum allowed cache size in bytes.
+     */
+    @MXBeanDescription("Maximum allowed cache size in bytes.")
+    public long getMaxMemorySize();
+
+    /**
+     * Sets maximum allowed cache size in bytes.
+     */
+    @MXBeanDescription("Set maximum allowed cache size in bytes.")
+    public void setMaxMemorySize(long maxMemSize);
+
+    /**
+     * Gets current queue size in bytes.
+     *
+     * @return current queue size in bytes.
+     */
+    @MXBeanDescription("Current FIFO queue size in bytes.")
+    public long getCurrentMemorySize();
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/cache/eviction/lru/LruEvictionPolicy.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/cache/eviction/lru/LruEvictionPolicy.java b/modules/core/src/main/java/org/apache/ignite/cache/eviction/lru/LruEvictionPolicy.java
index 309d577..0be26c8 100644
--- a/modules/core/src/main/java/org/apache/ignite/cache/eviction/lru/LruEvictionPolicy.java
+++ b/modules/core/src/main/java/org/apache/ignite/cache/eviction/lru/LruEvictionPolicy.java
@@ -18,26 +18,48 @@
 package org.apache.ignite.cache.eviction.lru;
 
 import org.apache.ignite.cache.eviction.*;
-import org.apache.ignite.configuration.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
+
 import org.jsr166.*;
 import org.jsr166.ConcurrentLinkedDeque8.*;
 
 import java.io.*;
 import java.util.*;
 
+import static org.apache.ignite.configuration.CacheConfiguration.*;
+
 /**
- * Eviction policy based on {@code Least Recently Used (LRU)} algorithm. This
- * implementation is very efficient since it is lock-free and does not
- * create any additional table-like data structures. The {@code LRU} ordering
- * information is maintained by attaching ordering metadata to cache entries.
+ * Eviction policy based on {@code Least Recently Used (LRU)} algorithm and supports batch eviction.
+ * <p>
+ * The eviction starts in the following cases:
+ * <ul>
+ *     <li>The cache size becomes {@code batchSize} elements greater than the maximum size.</li>
+ *     <li>
+ *         The size of cache entries in bytes becomes greater than the maximum memory size.
+ *         The size of cache entry calculates as sum of key size and value size.
+ *     </li>
+ * </ul>
+ * <b>Note:</b>Batch eviction is enabled only if maximum memory limit isn't set ({@code maxMemSize == 0}).
+ * {@code batchSize} elements will be evicted in this case. The default {@code batchSize} value is {@code 1}.
+
+ * This implementation is very efficient since it is lock-free and does not create any additional table-like
+ * data structures. The {@code LRU} ordering information is maintained by attaching ordering metadata to cache entries.
  */
 public class LruEvictionPolicy<K, V> implements EvictionPolicy<K, V>, LruEvictionPolicyMBean, Externalizable {
     /** */
     private static final long serialVersionUID = 0L;
 
     /** Maximum size. */
-    private volatile int max = CacheConfiguration.DFLT_CACHE_SIZE;
+    private volatile int max = DFLT_CACHE_SIZE;
+
+    /** Batch size. */
+    private volatile int batchSize = 1;
+
+    /** Max memory size. */
+    private volatile long maxMemSize;
+
+    /** Memory size. */
+    private final LongAdder8 memSize = new LongAdder8();
 
     /** Queue. */
     private final ConcurrentLinkedDeque8<EvictableEntry<K, V>> queue =
@@ -56,7 +78,7 @@ public class LruEvictionPolicy<K, V> implements EvictionPolicy<K, V>, LruEvictio
      * @param max Maximum allowed size of cache before entry will start getting evicted.
      */
     public LruEvictionPolicy(int max) {
-        A.ensure(max > 0, "max > 0");
+        A.ensure(max >= 0, "max >= 0");
 
         this.max = max;
     }
@@ -76,16 +98,45 @@ public class LruEvictionPolicy<K, V> implements EvictionPolicy<K, V>, LruEvictio
      * @param max Maximum allowed size of cache before entry will start getting evicted.
      */
     @Override public void setMaxSize(int max) {
-        A.ensure(max > 0, "max > 0");
+        A.ensure(max >= 0, "max >= 0");
 
         this.max = max;
     }
 
     /** {@inheritDoc} */
+    @Override public int getBatchSize() {
+        return batchSize;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void setBatchSize(int batchSize) {
+        A.ensure(batchSize > 0, "batchSize > 0");
+
+        this.batchSize = batchSize;
+    }
+
+    /** {@inheritDoc} */
     @Override public int getCurrentSize() {
         return queue.size();
     }
 
+    /** {@inheritDoc} */
+    @Override public long getMaxMemorySize() {
+        return maxMemSize;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void setMaxMemorySize(long maxMemSize) {
+        A.ensure(maxMemSize >= 0, "maxMemSize >= 0");
+
+        this.maxMemSize = maxMemSize;
+    }
+
+    /** {@inheritDoc} */
+    @Override public long getCurrentMemorySize() {
+        return memSize.longValue();
+    }
+
     /**
      * Gets read-only view on internal {@code FIFO} queue in proper order.
      *
@@ -107,8 +158,11 @@ public class LruEvictionPolicy<K, V> implements EvictionPolicy<K, V>, LruEvictio
         else {
             Node<EvictableEntry<K, V>> node = entry.removeMeta();
 
-            if (node != null)
+            if (node != null) {
                 queue.unlinkx(node);
+
+                memSize.add(-entry.size());
+            }
         }
     }
 
@@ -139,11 +193,18 @@ public class LruEvictionPolicy<K, V> implements EvictionPolicy<K, V>, LruEvictio
                         return false;
                     }
 
+                    memSize.add(entry.size());
+
                     return true;
                 }
                 // If node was unlinked by concurrent shrink() call, we must repeat the whole cycle.
                 else if (!entry.removeMeta(node))
                     return false;
+                else {
+                    memSize.add(-entry.size());
+
+                    return true;
+                }
             }
         }
         else if (queue.unlinkx(node)) {
@@ -163,31 +224,73 @@ public class LruEvictionPolicy<K, V> implements EvictionPolicy<K, V>, LruEvictio
      * Shrinks queue to maximum allowed size.
      */
     private void shrink() {
+        long maxMem = this.maxMemSize;
+
+        if (maxMem > 0) {
+            long startMemSize = memSize.longValue();
+
+            if (startMemSize >= maxMem)
+                for (long i = maxMem; i < startMemSize && memSize.longValue() > maxMem;) {
+                    int size = shrink0();
+
+                    if (size == -1)
+                        break;
+
+                    i += size;
+                }
+        }
+
         int max = this.max;
 
-        int startSize = queue.sizex();
+        if (max > 0) {
+            int startSize = queue.sizex();
 
-        for (int i = 0; i < startSize && queue.sizex() > max; i++) {
-            EvictableEntry<K, V> entry = queue.poll();
+            if (startSize >= max + (maxMem > 0 ? 1 : this.batchSize))
+                for (int i = max; i < startSize && queue.sizex() > max; i++)
+                    if (shrink0() == -1)
+                        break;
+        }
+    }
 
-            if (entry == null)
-                break;
+    /**
+     * Tries to remove one item from queue.
+     *
+     * @return number of bytes that was free. {@code -1} if queue is empty.
+     */
+    private int shrink0() {
+        EvictableEntry<K, V> entry = queue.poll();
 
-            Node<EvictableEntry<K, V>> meta = entry.removeMeta();
+        if (entry == null)
+            return -1;
 
-            if (meta != null && !entry.evict())
+        int size = 0;
+
+        Node<EvictableEntry<K, V>> meta = entry.removeMeta();
+
+        if (meta != null) {
+            size = entry.size();
+
+            memSize.add(-size);
+
+            if (!entry.evict())
                 touch(entry);
         }
+
+        return size;
     }
 
     /** {@inheritDoc} */
     @Override public void writeExternal(ObjectOutput out) throws IOException {
         out.writeInt(max);
+        out.writeInt(batchSize);
+        out.writeLong(maxMemSize);
     }
 
     /** {@inheritDoc} */
     @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
         max = in.readInt();
+        batchSize = in.readInt();
+        maxMemSize = in.readLong();
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/cache/eviction/lru/LruEvictionPolicyMBean.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/cache/eviction/lru/LruEvictionPolicyMBean.java b/modules/core/src/main/java/org/apache/ignite/cache/eviction/lru/LruEvictionPolicyMBean.java
index c243374..e17c057 100644
--- a/modules/core/src/main/java/org/apache/ignite/cache/eviction/lru/LruEvictionPolicyMBean.java
+++ b/modules/core/src/main/java/org/apache/ignite/cache/eviction/lru/LruEvictionPolicyMBean.java
@@ -41,10 +41,48 @@ public interface LruEvictionPolicyMBean {
     public void setMaxSize(int max);
 
     /**
+     * Gets batch size.
+     *
+     * @return batch size.
+     */
+    @MXBeanDescription("Batch size.")
+    public int getBatchSize();
+
+    /**
+     * Sets batch size.
+     *
+     * @param batchSize Batch size.
+     */
+    @MXBeanDescription("Set batch size.")
+    public void setBatchSize(int batchSize);
+
+    /**
      * Gets current queue size.
      *
      * @return Current queue size.
      */
     @MXBeanDescription("Current queue size.")
     public int getCurrentSize();
+
+    /**
+     * Gets maximum allowed cache size in bytes.
+     *
+     * @return maximum allowed cache size in bytes.
+     */
+    @MXBeanDescription("Maximum allowed cache size in bytes.")
+    public long getMaxMemorySize();
+
+    /**
+     * Sets maximum allowed cache size in bytes.
+     */
+    @MXBeanDescription("Set maximum allowed cache size in bytes.")
+    public void setMaxMemorySize(long maxMemSize);
+
+    /**
+     * Gets current queue size in bytes.
+     *
+     * @return current queue size in bytes.
+     */
+    @MXBeanDescription("Current queue size in  bytes.")
+    public long getCurrentMemorySize();
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/cache/eviction/random/RandomEvictionPolicy.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/cache/eviction/random/RandomEvictionPolicy.java b/modules/core/src/main/java/org/apache/ignite/cache/eviction/random/RandomEvictionPolicy.java
index c88b31d..00a912f 100644
--- a/modules/core/src/main/java/org/apache/ignite/cache/eviction/random/RandomEvictionPolicy.java
+++ b/modules/core/src/main/java/org/apache/ignite/cache/eviction/random/RandomEvictionPolicy.java
@@ -18,20 +18,22 @@
 package org.apache.ignite.cache.eviction.random;
 
 import org.apache.ignite.*;
+import org.apache.ignite.cache.*;
 import org.apache.ignite.cache.eviction.*;
-import org.apache.ignite.configuration.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
 
 import javax.cache.*;
 import java.io.*;
 
+import static org.apache.ignite.configuration.CacheConfiguration.*;
+
 /**
  * Cache eviction policy which will select random cache entry for eviction if cache
  * size exceeds the {@link #getMaxSize()} parameter. This implementation is
  * extremely light weight, lock-free, and does not create any data structures to maintain
  * any order for eviction.
  * <p>
- * Random eviction will provide the best performance over any key set in which every
+ * Random eviction will provide the best performance over any key queue in which every
  * key has the same probability of being accessed.
  */
 public class RandomEvictionPolicy<K, V> implements EvictionPolicy<K, V>, RandomEvictionPolicyMBean, Externalizable {
@@ -39,7 +41,7 @@ public class RandomEvictionPolicy<K, V> implements EvictionPolicy<K, V>, RandomE
     private static final long serialVersionUID = 0L;
 
     /** Maximum size. */
-    private volatile int max = CacheConfiguration.DFLT_CACHE_SIZE;
+    private volatile int max = DFLT_CACHE_SIZE;
 
     /**
      * Constructs random eviction policy with all defaults.
@@ -87,7 +89,7 @@ public class RandomEvictionPolicy<K, V> implements EvictionPolicy<K, V>, RandomE
 
         IgniteCache<K, V> cache = entry.unwrap(IgniteCache.class);
 
-        int size = cache.size();
+        int size = cache.localSize(CachePeekMode.ONHEAP);
 
         for (int i = max; i < size; i++) {
             Cache.Entry<K, V> e = cache.randomEntry();

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/cache/eviction/sorted/SortedEvictionPolicy.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/cache/eviction/sorted/SortedEvictionPolicy.java b/modules/core/src/main/java/org/apache/ignite/cache/eviction/sorted/SortedEvictionPolicy.java
index 7965c97..b8b82fa 100644
--- a/modules/core/src/main/java/org/apache/ignite/cache/eviction/sorted/SortedEvictionPolicy.java
+++ b/modules/core/src/main/java/org/apache/ignite/cache/eviction/sorted/SortedEvictionPolicy.java
@@ -34,7 +34,15 @@ import static org.apache.ignite.configuration.CacheConfiguration.*;
 /**
  * Cache eviction policy which will select the minimum cache entry for eviction.
  * <p>
- * The eviction starts when the cache size becomes {@code batchSize} elements greater than the maximum size.
+ * The eviction starts in the following cases:
+ * <ul>
+ *     <li>The cache size becomes {@code batchSize} elements greater than the maximum size.</li>
+ *     <li>
+ *         The size of cache entries in bytes becomes greater than the maximum memory size.
+ *         The size of cache entry calculates as sum of key size and value size.
+ *     </li>
+ * </ul>
+ * <b>Note:</b>Batch eviction is enabled only if maximum memory limit isn't set ({@code maxMemSize == 0}).
  * {@code batchSize} elements will be evicted in this case. The default {@code batchSize} value is {@code 1}.
  * <p>
  * Entries comparison based on {@link Comparator} instance if provided.
@@ -48,18 +56,24 @@ public class SortedEvictionPolicy<K, V> implements EvictionPolicy<K, V>, SortedE
     private static final long serialVersionUID = 0L;
 
     /** Maximum size. */
-    private volatile int max;
+    private volatile int max = DFLT_CACHE_SIZE;
 
     /** Batch size. */
     private volatile int batchSize = 1;
 
+    /** Max memory size. */
+    private volatile long maxMemSize;
+
+    /** Memory size. */
+    private final LongAdder8 memSize = new LongAdder8();
+
     /** Comparator. */
     private Comparator<Holder<K, V>> comp;
 
     /** Order. */
     private final AtomicLong orderCnt = new AtomicLong();
 
-    /** Backed sorted set. */
+    /** Backed sorted queue. */
     private final GridConcurrentSkipListSetEx<K, V> set;
 
     /**
@@ -96,7 +110,7 @@ public class SortedEvictionPolicy<K, V> implements EvictionPolicy<K, V>, SortedE
      * @param comp Entries comparator.
      */
     public SortedEvictionPolicy(int max, int batchSize, @Nullable Comparator<EvictableEntry<K, V>> comp) {
-        A.ensure(max > 0, "max > 0");
+        A.ensure(max >= 0, "max >= 0");
         A.ensure(batchSize > 0, "batchSize > 0");
 
         this.max = max;
@@ -106,6 +120,16 @@ public class SortedEvictionPolicy<K, V> implements EvictionPolicy<K, V>, SortedE
     }
 
     /**
+     * Constructs sorted eviction policy with given maximum size and given entry comparator.
+     *
+     * @param comp Entries comparator.
+     */
+    public SortedEvictionPolicy(@Nullable Comparator<EvictableEntry<K, V>> comp) {
+        this.comp = comp == null ? new DefaultHolderComparator<K, V>() : new HolderComparator<>(comp);
+        this.set = new GridConcurrentSkipListSetEx<>(this.comp);
+    }
+
+    /**
      * Gets maximum allowed size of cache before entry will start getting evicted.
      *
      * @return Maximum allowed size of cache before entry will start getting evicted.
@@ -120,7 +144,7 @@ public class SortedEvictionPolicy<K, V> implements EvictionPolicy<K, V>, SortedE
      * @param max Maximum allowed size of cache before entry will start getting evicted.
      */
     @Override public void setMaxSize(int max) {
-        A.ensure(max > 0, "max > 0");
+        A.ensure(max >= 0, "max >= 0");
 
         this.max = max;
     }
@@ -142,12 +166,29 @@ public class SortedEvictionPolicy<K, V> implements EvictionPolicy<K, V>, SortedE
         return set.sizex();
     }
 
+    /** {@inheritDoc} */
+    @Override public long getMaxMemorySize() {
+        return maxMemSize;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void setMaxMemorySize(long maxMemSize) {
+        A.ensure(maxMemSize >= 0, "maxMemSize >= 0");
+
+        this.maxMemSize = maxMemSize;
+    }
+
+    /** {@inheritDoc} */
+    @Override public long getCurrentMemorySize() {
+        return memSize.longValue();
+    }
+
     /**
-     * Gets read-only view of backed set in proper order.
+     * Gets read-only view of backed queue in proper order.
      *
-     * @return Read-only view of backed set.
+     * @return Read-only view of backed queue.
      */
-    public Collection<EvictableEntry<K, V>> set() {
+    public Collection<EvictableEntry<K, V>> queue() {
         Set<EvictableEntry<K, V>> cp = new LinkedHashSet<>();
 
         for (Holder<K, V> holder : set)
@@ -168,19 +209,22 @@ public class SortedEvictionPolicy<K, V> implements EvictionPolicy<K, V>, SortedE
         else {
             Holder<K, V> holder = entry.removeMeta();
 
-            if (holder != null)
+            if (holder != null) {
                 removeHolder(holder);
+
+                memSize.add(-entry.size());
+            }
         }
     }
 
     /**
      * @param entry Entry to touch.
-     * @return {@code True} if backed set has been changed by this call.
+     * @return {@code True} if backed queue has been changed by this call.
      */
     private boolean touch(EvictableEntry<K, V> entry) {
         Holder<K, V> holder = entry.meta();
 
-        // Entry has not been add yet to backed set..
+        // Entry has not been add yet to backed queue..
         if (holder == null) {
             while (true) {
                 holder = new Holder<>(entry, orderCnt.incrementAndGet());
@@ -188,7 +232,7 @@ public class SortedEvictionPolicy<K, V> implements EvictionPolicy<K, V>, SortedE
                 set.add(holder);
 
                 if (entry.putMetaIfAbsent(holder) != null) {
-                    // Was concurrently added, need to remove it from set.
+                    // Was concurrently added, need to remove it from queue.
                     removeHolder(holder);
 
                     // Set has not been changed.
@@ -196,17 +240,24 @@ public class SortedEvictionPolicy<K, V> implements EvictionPolicy<K, V>, SortedE
                 }
                 else if (holder.order > 0) {
                     if (!entry.isCached()) {
-                        // Was concurrently evicted, need to remove it from set.
+                        // Was concurrently evicted, need to remove it from queue.
                         removeHolder(holder);
 
                         return false;
                     }
 
+                    memSize.add(entry.size());
+
                     return true;
                 }
                 // If holder was removed by concurrent shrink() call, we must repeat the whole cycle.
                 else if (!entry.removeMeta(holder))
                     return false;
+                else {
+                    memSize.add(-entry.size());
+
+                    return true;
+                }
             }
         }
 
@@ -215,34 +266,71 @@ public class SortedEvictionPolicy<K, V> implements EvictionPolicy<K, V>, SortedE
     }
 
     /**
-     * Shrinks backed set to maximum allowed size.
+     * Shrinks backed queue to maximum allowed size.
      */
     private void shrink() {
-        int max = this.max;
+        long maxMem = this.maxMemSize;
+
+        if (maxMem > 0) {
+            long startMemSize = memSize.longValue();
 
-        int batchSize = this.batchSize;
+            if (startMemSize >= maxMem)
+                for (long i = maxMem; i < startMemSize && memSize.longValue() > maxMem;) {
+                    int size = shrink0();
 
-        int startSize = set.sizex();
+                    if (size == -1)
+                        break;
 
-        if (startSize >= max + batchSize) {
-            for (int i = max; i < startSize && set.sizex() > max; i++) {
-                Holder<K, V> h = set.pollFirst();
+                    i += size;
+                }
+        }
 
-                if (h == null)
-                    break;
+        int max = this.max;
 
-                EvictableEntry<K, V> entry = h.entry;
+        if (max > 0) {
+            int startSize = set.sizex();
 
-                if (h.order > 0 && entry.removeMeta(h) && !entry.evict())
-                    touch(entry);
+            if (startSize >= max + (maxMem > 0 ? 1 : this.batchSize)) {
+                for (int i = max; i < startSize && set.sizex() > max; i++) {
+                    if (shrink0() == -1)
+                        break;
+                }
             }
         }
     }
 
+    /**
+     * Tries to remove one item from queue.
+     *
+     * @return number of bytes that was free. {@code -1} if queue is empty.
+     */
+    private int shrink0() {
+        Holder<K, V> h = set.pollFirst();
+
+        if (h == null)
+            return -1;
+
+        int size = 0;
+
+        EvictableEntry<K, V> entry = h.entry;
+
+        if (h.order > 0 && entry.removeMeta(h)) {
+            size = entry.size();
+
+            memSize.add(-size);
+
+            if (!entry.evict())
+                touch(entry);
+        }
+
+        return size;
+    }
+
     /** {@inheritDoc} */
     @Override public void writeExternal(ObjectOutput out) throws IOException {
         out.writeInt(max);
         out.writeInt(batchSize);
+        out.writeLong(maxMemSize);
         out.writeObject(comp);
     }
 
@@ -251,11 +339,12 @@ public class SortedEvictionPolicy<K, V> implements EvictionPolicy<K, V>, SortedE
     @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
         max = in.readInt();
         batchSize = in.readInt();
+        maxMemSize = in.readLong();
         comp = (Comparator<Holder<K, V>>)in.readObject();
     }
 
     /**
-     * Removes holder from backed set and marks holder as removed.
+     * Removes holder from backed queue and marks holder as removed.
      *
      * @param holder Holder.
      */

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/cache/eviction/sorted/SortedEvictionPolicyMBean.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/cache/eviction/sorted/SortedEvictionPolicyMBean.java b/modules/core/src/main/java/org/apache/ignite/cache/eviction/sorted/SortedEvictionPolicyMBean.java
index bc696ff..7283453 100644
--- a/modules/core/src/main/java/org/apache/ignite/cache/eviction/sorted/SortedEvictionPolicyMBean.java
+++ b/modules/core/src/main/java/org/apache/ignite/cache/eviction/sorted/SortedEvictionPolicyMBean.java
@@ -63,4 +63,26 @@ public interface SortedEvictionPolicyMBean {
      */
     @MXBeanDescription("Current sorted key set size.")
     public int getCurrentSize();
+
+    /**
+     * Gets maximum allowed cache size in bytes.
+     *
+     * @return maximum allowed cache size in bytes.
+     */
+    @MXBeanDescription("Maximum allowed cache size in bytes.")
+    public long getMaxMemorySize();
+
+    /**
+     * Sets maximum allowed cache size in bytes.
+     */
+    @MXBeanDescription("Set maximum allowed cache size in bytes.")
+    public void setMaxMemorySize(long maxMemSize);
+
+    /**
+     * Gets current sorted entries queue size in bytes.
+     *
+     * @return current sorted entries queue size in bytes.
+     */
+    @MXBeanDescription("Current sorted entries set size in bytes.")
+    public long getCurrentMemorySize();
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/cache/query/QueryMetrics.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/cache/query/QueryMetrics.java b/modules/core/src/main/java/org/apache/ignite/cache/query/QueryMetrics.java
index e66b32d..ef8fc49 100644
--- a/modules/core/src/main/java/org/apache/ignite/cache/query/QueryMetrics.java
+++ b/modules/core/src/main/java/org/apache/ignite/cache/query/QueryMetrics.java
@@ -20,9 +20,9 @@ package org.apache.ignite.cache.query;
 import org.apache.ignite.internal.processors.cache.query.*;
 
 /**
- * Cache query metrics used to obtain statistics on query. You can get metrics for
- * particular query via {@link CacheQuery#metrics()} method or accumulated metrics
- * for all queries via {@link GridCacheQueryManager#metrics()}.
+ * Cache query metrics used to obtain statistics on query. Metrics for particular query
+ * can be get via {@link CacheQuery#metrics()} method or aggregated metrics for all queries
+ * via {@link CacheQuery#metrics()}.
  */
 public interface QueryMetrics {
     /**

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/cache/store/CacheStore.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/cache/store/CacheStore.java b/modules/core/src/main/java/org/apache/ignite/cache/store/CacheStore.java
index d018298..5bfdda1 100644
--- a/modules/core/src/main/java/org/apache/ignite/cache/store/CacheStore.java
+++ b/modules/core/src/main/java/org/apache/ignite/cache/store/CacheStore.java
@@ -94,6 +94,8 @@ public interface CacheStore<K, V> extends CacheLoader<K, V>, CacheWriter<K, V> {
      * @throws CacheWriterException If commit or rollback failed. Note that commit failure in some cases
      *      may bring cache transaction into {@link TransactionState#UNKNOWN} which will
      *      consequently cause all transacted entries to be invalidated.
+     * @deprecated Use {@link CacheStoreSessionListener} instead (refer to its JavaDoc for details).
      */
+    @Deprecated
     public void sessionEnd(boolean commit) throws CacheWriterException;
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/cache/store/CacheStoreSession.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/cache/store/CacheStoreSession.java b/modules/core/src/main/java/org/apache/ignite/cache/store/CacheStoreSession.java
index 640d4a3..329e994 100644
--- a/modules/core/src/main/java/org/apache/ignite/cache/store/CacheStoreSession.java
+++ b/modules/core/src/main/java/org/apache/ignite/cache/store/CacheStoreSession.java
@@ -19,6 +19,7 @@ package org.apache.ignite.cache.store;
 
 import org.apache.ignite.resources.*;
 import org.apache.ignite.transactions.*;
+import org.jetbrains.annotations.*;
 
 import java.util.*;
 
@@ -52,6 +53,27 @@ public interface CacheStoreSession {
     public boolean isWithinTransaction();
 
     /**
+     * Attaches the given object to this session.
+     * <p>
+     * An attached object may later be retrieved via the {@link #attachment()}
+     * method. Invoking this method causes any previous attachment to be
+     * discarded. To attach additional objects use {@link #properties()} map.
+     * <p>
+     * The current attachment may be discarded by attaching {@code null}.
+     *
+     * @param attachment The object to be attached (or {@code null} to discard current attachment).
+     * @return Previously attached object, if any.
+     */
+    @Nullable public <T> T attach(@Nullable Object attachment);
+
+    /**
+     * Retrieves the current attachment or {@code null} if there is no attachment.
+     *
+     * @return Currently attached object, if any.
+     */
+    @Nullable public <T> T attachment();
+
+    /**
      * Gets current session properties. You can add properties directly to the
      * returned map.
      *

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/cache/store/CacheStoreSessionListener.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/cache/store/CacheStoreSessionListener.java b/modules/core/src/main/java/org/apache/ignite/cache/store/CacheStoreSessionListener.java
new file mode 100644
index 0000000..1543bf9
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/cache/store/CacheStoreSessionListener.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cache.store;
+
+import org.apache.ignite.cache.store.jdbc.*;
+import org.apache.ignite.configuration.*;
+
+import javax.cache.configuration.*;
+import javax.sql.*;
+
+/**
+ * Cache store session listener that allows to implement callbacks
+ * for session lifecycle.
+ * <p>
+ * The most common use case for session listeners is database
+ * connection and transaction management. Store can be invoked one
+ * or several times during one session, depending on whether it's
+ * executed within cache transaction or not. In any case, you have
+ * to create a connection when session is started and commit it or
+ * rollback when session is finished.
+ * <p>
+ * Cache store session listener allows to implement this and other
+ * scenarios providing to callback methods:
+ * <ul>
+ *     <li>
+ *         {@link #onSessionStart(CacheStoreSession)} - called
+ *         before any store operation within a session is invoked.
+ *     </li>
+ *     <li>
+ *         {@link #onSessionEnd(CacheStoreSession, boolean)} - called
+ *         after all operations within a session are invoked.
+ *     </li>
+ * </ul>
+ * <h2>Implementations</h2>
+ * Ignites provides several out-of-the-box implementations
+ * of session listener (refer to individual JavaDocs for more
+ * details):
+ * <ul>
+ *     <li>
+ *         {@link CacheJdbcStoreSessionListener} - JDBC-based session
+ *         listener. For each session it gets a new JDBC connection from
+ *         provided {@link DataSource} and commits (or rolls back) it
+ *         when session ends.
+ *     </li>
+ *     <li>
+ *         {@ignitelink org.apache.ignite.cache.store.spring.CacheSpringStoreSessionListener} -
+ *         session listener based on Spring transaction management.
+ *         It starts a new DB transaction for each session and commits
+ *         (or rolls back) it when session ends. If there is no ongoing
+ *         cache transaction, this listener is no-op.
+ *     </li>
+ *     <li>
+ *         {@ignitelink org.apache.ignite.cache.store.hibernate.CacheHibernateStoreSessionListener} -
+ *         Hibernate-based session listener. It creates a new Hibernate
+ *         session for each Ignite session. If there is an ongoing cache
+ *         transaction, a corresponding Hibernate transaction is created
+ *         as well.
+ *     </li>
+ * </ul>
+ * <h2>Configuration</h2>
+ * There are two ways to configure a session listener:
+ * <ul>
+ *     <li>
+ *         Provide a global listener for all caches via
+ *         {@link IgniteConfiguration#setCacheStoreSessionListenerFactories(Factory[])}
+ *         configuration property. This will we called for any store
+ *         session, not depending on what caches participate in
+ *         transaction.
+ *     </li>
+ *     <li>
+ *         Provide a listener for a particular cache via
+ *         {@link CacheConfiguration#setCacheStoreSessionListenerFactories(Factory[])}
+ *         configuration property. This will be called only if the
+ *         cache participates in transaction.
+ *     </li>
+ * </ul>
+ * For example, here is how global {@link CacheJdbcStoreSessionListener}
+ * can be configured in Spring XML configuration file:
+ * <pre name="code" class="xml">
+ * &lt;bean class="org.apache.ignite.configuration.IgniteConfiguration"&gt;
+ *     ...
+ *
+ *     &lt;property name="CacheStoreSessionListenerFactories"&gt;
+ *         &lt;list&gt;
+ *             &lt;bean class="javax.cache.configuration.FactoryBuilder$SingletonFactory"&gt;
+ *                 &lt;constructor-arg&gt;
+ *                     &lt;bean class="org.apache.ignite.cache.store.jdbc.CacheJdbcStoreSessionListener"&gt;
+ *                         &lt;!-- Inject external data source. --&gt;
+ *                         &lt;property name="dataSource" ref="jdbc-data-source"/&gt;
+ *                     &lt;/bean&gt;
+ *                 &lt;/constructor-arg&gt;
+ *             &lt;/bean&gt;
+ *         &lt;/list&gt;
+ *     &lt;/property&gt;
+ * &lt;/bean&gt;
+ * </pre>
+ */
+public interface CacheStoreSessionListener {
+    /**
+     * On session start callback.
+     * <p>
+     * Called before any store operation within a session is invoked.
+     *
+     * @param ses Current session.
+     */
+    public void onSessionStart(CacheStoreSession ses);
+
+    /**
+     * On session end callback.
+     * <p>
+     * Called after all operations within a session are invoked.
+     *
+     * @param ses Current session.
+     * @param commit {@code True} if persistence store transaction
+     *      should commit, {@code false} for rollback.
+     */
+    public void onSessionEnd(CacheStoreSession ses, boolean commit);
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheJdbcStoreSessionListener.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheJdbcStoreSessionListener.java b/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheJdbcStoreSessionListener.java
new file mode 100644
index 0000000..a20e535
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheJdbcStoreSessionListener.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cache.store.jdbc;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cache.store.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.lifecycle.*;
+
+import javax.cache.*;
+import javax.cache.integration.*;
+import javax.sql.*;
+import java.sql.*;
+
+/**
+ * Cache store session listener based on JDBC connection.
+ * <p>
+ * For each session this listener gets a new JDBC connection
+ * from provided {@link DataSource} and commits (or rolls
+ * back) it when session ends.
+ * <p>
+ * The connection is saved as a store session
+  * {@link CacheStoreSession#attachment() attachment}.
+ * The listener guarantees that the connection will be
+ * available for any store operation. If there is an
+ * ongoing cache transaction, all operations within this
+ * transaction will be committed or rolled back only when
+ * session ends.
+ * <p>
+ * As an example, here is how the {@link CacheStore#write(Cache.Entry)}
+ * method can be implemented if {@link CacheJdbcStoreSessionListener}
+ * is configured:
+ * <pre name="code" class="java">
+ * private static class Store extends CacheStoreAdapter&lt;Integer, Integer&gt; {
+ *     &#64;CacheStoreSessionResource
+ *     private CacheStoreSession ses;
+ *
+ *     &#64;Override public void write(Cache.Entry&lt;? extends Integer, ? extends Integer&gt; entry) throws CacheWriterException {
+ *         // Get connection from the current session.
+ *         Connection conn = ses.attachment();
+ *
+ *         // Execute update SQL query.
+ *         try {
+ *             conn.createStatement().executeUpdate("...");
+ *         }
+ *         catch (SQLException e) {
+ *             throw new CacheWriterException("Failed to update the store.", e);
+ *         }
+ *     }
+ * }
+ * </pre>
+ * JDBC connection will be automatically created by the listener
+ * at the start of the session and closed when it ends.
+ */
+public class CacheJdbcStoreSessionListener implements CacheStoreSessionListener, LifecycleAware {
+    /** Data source. */
+    private DataSource dataSrc;
+
+    /**
+     * Sets data source.
+     * <p>
+     * This is a required parameter. If data source is not set,
+     * exception will be thrown on startup.
+     *
+     * @param dataSrc Data source.
+     */
+    public void setDataSource(DataSource dataSrc) {
+        this.dataSrc = dataSrc;
+    }
+
+    /**
+     * Gets data source.
+     *
+     * @return Data source.
+     */
+    public DataSource getDataSource() {
+        return dataSrc;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void start() throws IgniteException {
+        if (dataSrc == null)
+            throw new IgniteException("Data source is required by " + getClass().getSimpleName() + '.');
+    }
+
+    /** {@inheritDoc} */
+    @Override public void stop() throws IgniteException {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onSessionStart(CacheStoreSession ses) {
+        if (ses.attachment() == null) {
+            try {
+                Connection conn = dataSrc.getConnection();
+
+                conn.setAutoCommit(false);
+
+                ses.attach(conn);
+            }
+            catch (SQLException e) {
+                throw new CacheWriterException("Failed to start store session [tx=" + ses.transaction() + ']', e);
+            }
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onSessionEnd(CacheStoreSession ses, boolean commit) {
+        Connection conn = ses.attach(null);
+
+        if (conn != null) {
+            try {
+                if (commit)
+                    conn.commit();
+                else
+                    conn.rollback();
+            }
+            catch (SQLException e) {
+                throw new CacheWriterException("Failed to start store session [tx=" + ses.transaction() + ']', e);
+            }
+            finally {
+                U.closeQuiet(conn);
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/cluster/ClusterNode.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/cluster/ClusterNode.java b/modules/core/src/main/java/org/apache/ignite/cluster/ClusterNode.java
index 9cb5d3d..85fd08a 100644
--- a/modules/core/src/main/java/org/apache/ignite/cluster/ClusterNode.java
+++ b/modules/core/src/main/java/org/apache/ignite/cluster/ClusterNode.java
@@ -18,7 +18,9 @@
 package org.apache.ignite.cluster;
 
 import org.apache.ignite.*;
+import org.apache.ignite.configuration.*;
 import org.apache.ignite.lang.*;
+import org.apache.ignite.spi.discovery.*;
 import org.jetbrains.annotations.*;
 
 import java.util.*;
@@ -33,7 +35,7 @@ import java.util.*;
  * You can use cluster node attributes to provide static information about a node.
  * This information is initialized once within a cluster, during the node startup, and
  * remains the same throughout the lifetime of a node. Use
- * {@link org.apache.ignite.configuration.IgniteConfiguration#getUserAttributes()} method to initialize your custom
+ * {@link IgniteConfiguration#getUserAttributes()} method to initialize your custom
  * node attributes at startup. Here is an example of how to assign an attribute to a node at startup:
  * <pre name="code" class="xml">
  * &lt;bean class="org.apache.ignite.configuration.IgniteConfiguration">
@@ -114,7 +116,7 @@ public interface ClusterNode {
 
     /**
      * Gets a node attribute. Attributes are assigned to nodes at startup
-     * via {@link org.apache.ignite.configuration.IgniteConfiguration#getUserAttributes()} method.
+     * via {@link IgniteConfiguration#getUserAttributes()} method.
      * <p>
      * The system adds the following attributes automatically:
      * <ul>
@@ -149,7 +151,7 @@ public interface ClusterNode {
 
     /**
      * Gets all node attributes. Attributes are assigned to nodes at startup
-     * via {@link org.apache.ignite.configuration.IgniteConfiguration#getUserAttributes()} method.
+     * via {@link IgniteConfiguration#getUserAttributes()} method.
      * <p>
      * The system adds the following attributes automatically:
      * <ul>
@@ -167,7 +169,7 @@ public interface ClusterNode {
     /**
      * Gets collection of addresses this node is known by.
      * <p>
-     * If {@link org.apache.ignite.configuration.IgniteConfiguration#getLocalHost()} value isn't {@code null} node will try to use that
+     * If {@link IgniteConfiguration#getLocalHost()} value isn't {@code null} node will try to use that
      * address for all communications and returned collection will contain only that address.
      * If it is {@code null} then local wildcard address will be used, and Ignite
      * will make the best effort to supply all addresses of that node in returned collection.
@@ -179,12 +181,12 @@ public interface ClusterNode {
     /**
      * Gets collection of host names this node is known by.
      * <p>
-     * If {@link org.apache.ignite.configuration.IgniteConfiguration#getLocalHost()} value isn't {@code null} node will try to use
+     * If {@link IgniteConfiguration#getLocalHost()} value isn't {@code null} node will try to use
      * the host name of that resolved address for all communications and
      * returned collection will contain only that host name.
      * If that host name can not be resolved then ip address returned by method {@link #addresses()} is used.
      * <p>
-     * If {@link org.apache.ignite.configuration.IgniteConfiguration#getLocalHost()} value is {@code null} then local wildcard address will be used,
+     * If {@link IgniteConfiguration#getLocalHost()} value is {@code null} then local wildcard address will be used,
      * and this method returns host names of all addresses of that node.
      *
      * @return Collection of host names.
@@ -238,9 +240,17 @@ public interface ClusterNode {
     public boolean isDaemon();
 
     /**
-     * Tests whether or not this node is a client node.
+     * Tests whether or not this node is connected to cluster as a client.
+     * <p>
+     * Do not confuse client in terms of
+     * discovery {@link DiscoverySpi#isClientMode()} and client in terms of cache
+     * {@link IgniteConfiguration#isClientMode()}. Cache clients cannot carry data,
+     * while topology clients connect to topology in a different way.
      *
      * @return {@code True} if this node is a client node, {@code false} otherwise.
+     * @see IgniteConfiguration#isClientMode()
+     * @see Ignition#isClientMode()
+     * @see DiscoverySpi#isClientMode()
      */
     public boolean isClient();
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java b/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java
index df6b2ee..1aa4fd6 100644
--- a/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java
+++ b/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java
@@ -25,7 +25,6 @@ import org.apache.ignite.cache.eviction.*;
 import org.apache.ignite.cache.query.annotations.*;
 import org.apache.ignite.cache.store.*;
 import org.apache.ignite.cluster.*;
-import org.apache.ignite.internal.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
 import org.apache.ignite.lang.*;
 import org.apache.ignite.plugin.*;
@@ -145,9 +144,6 @@ public class CacheConfiguration<K, V> extends MutableConfiguration<K, V> {
     /** Default value for 'readFromBackup' flag. */
     public static final boolean DFLT_READ_FROM_BACKUP = true;
 
-    /** Filter that accepts only server nodes. */
-    public static final IgnitePredicate<ClusterNode> SERVER_NODES = new IgniteServerNodePredicate();
-
     /** Filter that accepts all nodes. */
     public static final IgnitePredicate<ClusterNode> ALL_NODES = new IgniteAllNodesPredicate();
 
@@ -316,6 +312,9 @@ public class CacheConfiguration<K, V> extends MutableConfiguration<K, V> {
     /** Cache topology validator. */
     private TopologyValidator topValidator;
 
+    /** Cache store session listeners. */
+    private Factory<? extends CacheStoreSessionListener>[] storeSesLsnrs;
+
     /** Empty constructor (all values are initialized to their defaults). */
     public CacheConfiguration() {
         /* No-op. */
@@ -389,6 +388,7 @@ public class CacheConfiguration<K, V> extends MutableConfiguration<K, V> {
         sqlOnheapRowCacheSize = cc.getSqlOnheapRowCacheSize();
         startSize = cc.getStartSize();
         storeFactory = cc.getCacheStoreFactory();
+        storeSesLsnrs = cc.getCacheStoreSessionListenerFactories();
         swapEnabled = cc.isSwapEnabled();
         tmLookupClsName = cc.getTransactionManagerLookupClassName();
         topValidator = cc.getTopologyValidator();
@@ -1664,7 +1664,18 @@ public class CacheConfiguration<K, V> extends MutableConfiguration<K, V> {
         A.ensure(indexedTypes == null || (indexedTypes.length & 1) == 0,
             "Number of indexed types is expected to be even. Refer to method javadoc for details.");
 
-        this.indexedTypes = indexedTypes;
+        if (indexedTypes != null) {
+            int len = indexedTypes.length;
+
+            Class<?>[] newIndexedTypes = new Class<?>[len];
+
+            for (int i = 0; i < len; i++)
+                newIndexedTypes[i] = U.box(indexedTypes[i]);
+
+            this.indexedTypes = newIndexedTypes;
+        }
+        else
+            this.indexedTypes = null;
 
         return this;
     }
@@ -1734,30 +1745,37 @@ public class CacheConfiguration<K, V> extends MutableConfiguration<K, V> {
         return this;
     }
 
-    /** {@inheritDoc} */
-    @Override public String toString() {
-        return S.toString(CacheConfiguration.class, this);
+    /**
+     * Gets cache store session listener factories.
+     *
+     * @return Cache store session listener factories.
+     * @see CacheStoreSessionListener
+     */
+    public Factory<? extends CacheStoreSessionListener>[] getCacheStoreSessionListenerFactories() {
+        return storeSesLsnrs;
     }
 
     /**
-     * Filter that accepts only server nodes.
+     * Cache store session listener factories.
+     * <p>
+     * These listeners override global listeners provided in
+     * {@link IgniteConfiguration#setCacheStoreSessionListenerFactories(Factory[])}
+     * configuration property.
+     *
+     * @param storeSesLsnrs Cache store session listener factories.
+     * @return {@code this} for chaining.
+     * @see CacheStoreSessionListener
      */
-    public static class IgniteServerNodePredicate implements IgnitePredicate<ClusterNode> {
-        /** */
-        private static final long serialVersionUID = 0L;
-
-        @Override public boolean apply(ClusterNode n) {
-            Boolean attr = n.attribute(IgniteNodeAttributes.ATTR_CLIENT_MODE);
-
-            return attr != null && !attr;
-        }
+    public CacheConfiguration setCacheStoreSessionListenerFactories(
+        Factory<? extends CacheStoreSessionListener>... storeSesLsnrs) {
+        this.storeSesLsnrs = storeSesLsnrs;
 
-        @Override public boolean equals(Object obj) {
-            if (obj == null)
-                return false;
+        return this;
+    }
 
-            return obj.getClass().equals(this.getClass());
-        }
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(CacheConfiguration.class, this);
     }
 
     /**
@@ -1767,10 +1785,12 @@ public class CacheConfiguration<K, V> extends MutableConfiguration<K, V> {
         /** */
         private static final long serialVersionUID = 0L;
 
+        /** {@inheritDoc} */
         @Override public boolean apply(ClusterNode clusterNode) {
             return true;
         }
 
+        /** {@inheritDoc} */
         @Override public boolean equals(Object obj) {
             if (obj == null)
                 return false;

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java b/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java
index ebe2b8e..2d36c7a 100644
--- a/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java
+++ b/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.configuration;
 
 import org.apache.ignite.*;
+import org.apache.ignite.cache.store.*;
 import org.apache.ignite.cluster.*;
 import org.apache.ignite.compute.*;
 import org.apache.ignite.events.*;
@@ -52,6 +53,7 @@ import org.apache.ignite.spi.loadbalancing.roundrobin.*;
 import org.apache.ignite.spi.swapspace.*;
 import org.apache.ignite.spi.swapspace.file.*;
 
+import javax.cache.configuration.*;
 import javax.cache.event.*;
 import javax.cache.expiry.*;
 import javax.cache.integration.*;
@@ -334,9 +336,6 @@ public class IgniteConfiguration {
     /** Cache configurations. */
     private CacheConfiguration[] cacheCfg;
 
-    /** Client cache configurations. */
-    private NearCacheConfiguration[] nearCacheCfg;
-
     /** Client mode flag. */
     private Boolean clientMode;
 
@@ -398,6 +397,9 @@ public class IgniteConfiguration {
     /** User's class loader. */
     private ClassLoader classLdr;
 
+    /** Cache store session listeners. */
+    private Factory<CacheStoreSessionListener>[] storeSesLsnrs;
+
     /**
      * Creates valid grid configuration with all default values.
      */
@@ -478,6 +480,7 @@ public class IgniteConfiguration {
         segResolvers = cfg.getSegmentationResolvers();
         sndRetryCnt = cfg.getNetworkSendRetryCount();
         sndRetryDelay = cfg.getNetworkSendRetryDelay();
+        storeSesLsnrs = cfg.getCacheStoreSessionListenerFactories();
         svcCfgs = cfg.getServiceConfiguration();
         sysPoolSize = cfg.getSystemThreadPoolSize();
         timeSrvPortBase = cfg.getTimeServerPortBase();
@@ -1823,9 +1826,11 @@ public class IgniteConfiguration {
     }
 
     /**
-     * Gets client mode flag.
+     * Gets client mode flag. Client node cannot hold data in the caches. It's recommended to use
+     * {@link DiscoverySpi} in client mode if this property is {@code true}.
      *
      * @return Client mode flag.
+     * @see TcpDiscoverySpi#setForceServerMode(boolean)
      */
     public Boolean isClientMode() {
         return clientMode;
@@ -2188,15 +2193,21 @@ public class IgniteConfiguration {
     }
 
     /**
+     * Gets plugin configurations.
+     *
      * @return Plugin configurations.
+     * @see PluginProvider
      */
     public PluginConfiguration[] getPluginConfigurations() {
         return pluginCfgs;
     }
 
     /**
+     * Sets plugin configurations.
+     *
      * @param pluginCfgs Plugin configurations.
      * @return {@code this} for chaining.
+     * @see PluginProvider
      */
     public IgniteConfiguration setPluginConfigurations(PluginConfiguration... pluginCfgs) {
         this.pluginCfgs = pluginCfgs;
@@ -2242,6 +2253,35 @@ public class IgniteConfiguration {
         return classLdr;
     }
 
+    /**
+     * Gets cache store session listener factories.
+     *
+     * @return Cache store session listener factories.
+     * @see CacheStoreSessionListener
+     */
+    public Factory<CacheStoreSessionListener>[] getCacheStoreSessionListenerFactories() {
+        return storeSesLsnrs;
+    }
+
+    /**
+     * Cache store session listener factories.
+     * <p>
+     * These are global store session listeners, so they are applied to
+     * all caches. If you need to override listeners for a
+     * particular cache, use {@link CacheConfiguration#setCacheStoreSessionListenerFactories(Factory[])}
+     * configuration property.
+     *
+     * @param storeSesLsnrs Cache store session listener factories.
+     * @return {@code this} for chaining.
+     * @see CacheStoreSessionListener
+     */
+    public IgniteConfiguration setCacheStoreSessionListenerFactories(
+        Factory<CacheStoreSessionListener>... storeSesLsnrs) {
+        this.storeSesLsnrs = storeSesLsnrs;
+
+        return this;
+    }
+
     /** {@inheritDoc} */
     @Override public String toString() {
         return S.toString(IgniteConfiguration.class, this);

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/igfs/IgfsUserContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/igfs/IgfsUserContext.java b/modules/core/src/main/java/org/apache/ignite/igfs/IgfsUserContext.java
new file mode 100644
index 0000000..5a65bdb
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/igfs/IgfsUserContext.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.igfs;
+
+import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.lang.*;
+import org.jetbrains.annotations.*;
+
+import java.util.concurrent.*;
+
+/**
+ * Provides ability to execute IGFS code in a context of a specific user.
+ */
+public abstract class IgfsUserContext {
+    /** Thread local to hold the current user context. */
+    private static final ThreadLocal<String> userStackThreadLocal = new ThreadLocal<>();
+
+    /**
+     * Executes given callable in the given user context.
+     * The main contract of this method is that {@link #currentUser()} method invoked
+     * inside closure always returns 'user' this callable executed with.
+     * @param user the user name to invoke closure on behalf of.
+     * @param clo the closure to execute
+     * @param <T> The type of closure result.
+     * @return the result of closure execution.
+     * @throws IllegalArgumentException if user name is null or empty String or if the closure is null.
+     */
+    public static <T> T doAs(String user, final IgniteOutClosure<T> clo) {
+        if (F.isEmpty(user))
+            throw new IllegalArgumentException("Failed to use null or empty user name.");
+
+        final String ctxUser = userStackThreadLocal.get();
+
+        if (F.eq(ctxUser, user))
+            return clo.apply(); // correct context is already there
+
+        userStackThreadLocal.set(user);
+
+        try {
+            return clo.apply();
+        }
+        finally {
+            userStackThreadLocal.set(ctxUser);
+        }
+    }
+
+    /**
+     * Same contract that {@link #doAs(String, IgniteOutClosure)} has, but accepts
+     * callable that throws checked Exception.
+     * The Exception is not ever wrapped anyhow.
+     * If your Callable throws Some specific checked Exceptions, the recommended usage pattern is:
+     * <pre name="code" class="java">
+     *  public Foo myOperation() throws MyCheckedException1, MyCheckedException2 {
+     *      try {
+     *          return IgfsUserContext.doAs(user, new Callable<Foo>() {
+     *              &#64;Override public Foo call() throws MyCheckedException1, MyCheckedException2 {
+     *                  return makeSomeFoo(); // do the job
+     *              }
+     *          });
+     *      }
+     *      catch (MyCheckedException1 | MyCheckedException2 | RuntimeException | Error e) {
+     *          throw e;
+     *      }
+     *      catch (Exception e) {
+     *          throw new AssertionError("Must never go there.");
+     *      }
+     *  }
+     * </pre>
+     * @param user the user name to invoke closure on behalf of.
+     * @param clbl the Callable to execute
+     * @param <T> The type of callable result.
+     * @return the result of closure execution.
+     * @throws IllegalArgumentException if user name is null or empty String or if the closure is null.
+     */
+    public static <T> T doAs(String user, final Callable<T> clbl) throws Exception {
+        if (F.isEmpty(user))
+            throw new IllegalArgumentException("Failed to use null or empty user name.");
+
+        final String ctxUser = userStackThreadLocal.get();
+
+        if (F.eq(ctxUser, user))
+            return clbl.call(); // correct context is already there
+
+        userStackThreadLocal.set(user);
+
+        try {
+            return clbl.call();
+        }
+        finally {
+            userStackThreadLocal.set(ctxUser);
+        }
+    }
+
+    /**
+     * Gets the current context user.
+     * If this method is invoked outside of any {@link #doAs(String, IgniteOutClosure)} on the call stack, it will
+     * return null. Otherwise it will return the user name set in the most lower
+     * {@link #doAs(String, IgniteOutClosure)} call on the call stack.
+     * @return The current user, may be null.
+     */
+    @Nullable public static String currentUser() {
+        return userStackThreadLocal.get();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/igfs/secondary/IgfsSecondaryFileSystem.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/igfs/secondary/IgfsSecondaryFileSystem.java b/modules/core/src/main/java/org/apache/ignite/igfs/secondary/IgfsSecondaryFileSystem.java
index 9026eac..cb69352 100644
--- a/modules/core/src/main/java/org/apache/ignite/igfs/secondary/IgfsSecondaryFileSystem.java
+++ b/modules/core/src/main/java/org/apache/ignite/igfs/secondary/IgfsSecondaryFileSystem.java
@@ -198,4 +198,11 @@ public interface IgfsSecondaryFileSystem {
      * @return Map of properties.
      */
     public Map<String,String> properties();
+
+
+    /**
+     * Closes the secondary file system.
+     * @throws IgniteException in case of an error.
+     */
+    public void close() throws IgniteException;
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/internal/ClusterMetricsSnapshot.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/ClusterMetricsSnapshot.java b/modules/core/src/main/java/org/apache/ignite/internal/ClusterMetricsSnapshot.java
index 4d5d146..6da45ea 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/ClusterMetricsSnapshot.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/ClusterMetricsSnapshot.java
@@ -1247,6 +1247,20 @@ public class ClusterMetricsSnapshot implements ClusterMetrics {
     /**
      * Serializes node metrics into byte array.
      *
+     * @param metrics Node metrics to serialize.
+     * @return New offset.
+     */
+    public static byte[] serialize(ClusterMetrics metrics) {
+        byte[] buf = new byte[METRICS_SIZE];
+
+        serialize(buf, 0, metrics);
+
+        return buf;
+    }
+
+    /**
+     * Serializes node metrics into byte array.
+     *
      * @param data Byte array.
      * @param off Offset into byte array.
      * @param metrics Node metrics to serialize.

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java
index 505204d..f33fa39 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java
@@ -20,9 +20,9 @@ package org.apache.ignite.internal;
 import org.apache.ignite.*;
 import org.apache.ignite.cluster.*;
 import org.apache.ignite.events.*;
+import org.apache.ignite.internal.cluster.*;
 import org.apache.ignite.internal.interop.*;
 import org.apache.ignite.internal.managers.deployment.*;
-import org.apache.ignite.internal.managers.discovery.*;
 import org.apache.ignite.internal.managers.eventstorage.*;
 import org.apache.ignite.internal.processors.cache.*;
 import org.apache.ignite.internal.processors.continuous.*;
@@ -131,41 +131,91 @@ class GridEventConsumeHandler implements GridContinuousHandler {
         final boolean loc = nodeId.equals(ctx.localNodeId());
 
         lsnr = new GridLocalEventListener() {
+            /** node ID, routine ID, event */
+            private final Queue<T3<UUID, UUID, Event>> notificationQueue = new LinkedList<>();
+
+            private boolean notificationInProgress;
+
             @Override public void onEvent(Event evt) {
-                if (filter == null || filter.apply(evt)) {
-                    if (loc) {
-                        if (!cb.apply(nodeId, evt))
-                            ctx.continuous().stopRoutine(routineId);
-                    }
-                    else {
-                        GridDiscoveryManager disco = ctx.discovery();
+                if (filter != null && !filter.apply(evt))
+                    return;
+
+                if (loc) {
+                    if (!cb.apply(nodeId, evt))
+                        ctx.continuous().stopRoutine(routineId);
+                }
+                else {
+                    if (ctx.discovery().node(nodeId) == null)
+                        return;
+
+                    synchronized (notificationQueue) {
+                        notificationQueue.add(new T3<>(nodeId, routineId, evt));
+
+                        if (!notificationInProgress) {
+                            ctx.getSystemExecutorService().submit(new Runnable() {
+                                @Override public void run() {
+                                    if (!ctx.continuous().lockStopping())
+                                        return;
 
-                        ClusterNode node = disco.node(nodeId);
+                                    try {
+                                        while (true) {
+                                            T3<UUID, UUID, Event> t3;
 
-                        if (node != null) {
-                            try {
-                                EventWrapper wrapper = new EventWrapper(evt);
+                                            synchronized (notificationQueue) {
+                                                t3 = notificationQueue.poll();
 
-                                if (evt instanceof CacheEvent) {
-                                    String cacheName = ((CacheEvent)evt).cacheName();
+                                                if (t3 == null) {
+                                                    notificationInProgress = false;
 
-                                    if (ctx.config().isPeerClassLoadingEnabled() && disco.cacheNode(node, cacheName)) {
-                                        wrapper.p2pMarshal(ctx.config().getMarshaller());
+                                                    return;
+                                                }
+                                            }
 
-                                        wrapper.cacheName = cacheName;
+                                            try {
+                                                Event evt = t3.get3();
 
-                                        GridCacheDeploymentManager depMgr =
-                                            ctx.cache().internalCache(cacheName).context().deploy();
+                                                EventWrapper wrapper = new EventWrapper(evt);
 
-                                        depMgr.prepare(wrapper);
+                                                if (evt instanceof CacheEvent) {
+                                                    String cacheName = ((CacheEvent)evt).cacheName();
+
+                                                    ClusterNode node = ctx.discovery().node(t3.get1());
+
+                                                    if (node == null)
+                                                        continue;
+
+                                                    if (ctx.config().isPeerClassLoadingEnabled()
+                                                        && ctx.discovery().cacheNode(node, cacheName)) {
+                                                        wrapper.p2pMarshal(ctx.config().getMarshaller());
+
+                                                        wrapper.cacheName = cacheName;
+
+                                                        GridCacheDeploymentManager depMgr = ctx.cache()
+                                                            .internalCache(cacheName).context().deploy();
+
+                                                        depMgr.prepare(wrapper);
+                                                    }
+                                                }
+
+                                                ctx.continuous().addNotification(t3.get1(), t3.get2(), wrapper, null, false,
+                                                    false);
+                                            }
+                                            catch (ClusterTopologyCheckedException ignored) {
+                                                // No-op.
+                                            }
+                                            catch (Throwable e) {
+                                                U.error(ctx.log(GridEventConsumeHandler.class),
+                                                    "Failed to send event notification to node: " + nodeId, e);
+                                            }
+                                        }
+                                    }
+                                    finally {
+                                        ctx.continuous().unlockStopping();
                                     }
                                 }
+                            });
 
-                                ctx.continuous().addNotification(nodeId, routineId, wrapper, null, false, false);
-                            }
-                            catch (IgniteCheckedException e) {
-                                U.error(ctx.log(getClass()), "Failed to send event notification to node: " + nodeId, e);
-                            }
+                            notificationInProgress = true;
                         }
                     }
                 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java
index ad7d562..d6542f3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java
@@ -552,4 +552,9 @@ public interface GridKernalContext extends Iterable<GridComponent> {
      * @return Marshaller context.
      */
     public MarshallerContextImpl marshallerContext();
+
+    /**
+     * @return {@code True} if local node is client node (has flag {@link IgniteConfiguration#isClientMode()} set).
+     */
+    public boolean clientNode();
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java
index 1ff483e..f921d49 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java
@@ -894,6 +894,11 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable
     }
 
     /** {@inheritDoc} */
+    @Override public boolean clientNode() {
+        return cfg.isClientMode();
+    }
+
+    /** {@inheritDoc} */
     @Override public String toString() {
         return S.toString(GridKernalContextImpl.class, this);
     }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
index c4b93b8..4f5e365 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
@@ -46,6 +46,7 @@ import org.apache.ignite.internal.processors.datastructures.*;
 import org.apache.ignite.internal.processors.hadoop.*;
 import org.apache.ignite.internal.processors.job.*;
 import org.apache.ignite.internal.processors.jobmetrics.*;
+import org.apache.ignite.internal.processors.nodevalidation.*;
 import org.apache.ignite.internal.processors.offheap.*;
 import org.apache.ignite.internal.processors.plugin.*;
 import org.apache.ignite.internal.processors.port.*;
@@ -56,7 +57,6 @@ import org.apache.ignite.internal.processors.security.*;
 import org.apache.ignite.internal.processors.segmentation.*;
 import org.apache.ignite.internal.processors.service.*;
 import org.apache.ignite.internal.processors.session.*;
-import org.apache.ignite.internal.processors.nodevalidation.*;
 import org.apache.ignite.internal.processors.task.*;
 import org.apache.ignite.internal.processors.timeout.*;
 import org.apache.ignite.internal.util.*;
@@ -169,11 +169,11 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
 
     /** */
     @GridToStringExclude
-    private Timer starveTimer;
+    private GridTimeoutProcessor.CancelableTask starveTask;
 
     /** */
     @GridToStringExclude
-    private Timer metricsLogTimer;
+    private GridTimeoutProcessor.CancelableTask metricsLogTask;
 
     /** Indicate error on grid stop. */
     @GridToStringExclude
@@ -867,13 +867,11 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
         if (starveCheck) {
             final long interval = F.isEmpty(intervalStr) ? PERIODIC_STARVATION_CHECK_FREQ : Long.parseLong(intervalStr);
 
-            starveTimer = new Timer("ignite-starvation-checker");
-
-            starveTimer.scheduleAtFixedRate(new GridTimerTask() {
+            starveTask = ctx.timeout().schedule(new Runnable() {
                 /** Last completed task count. */
                 private long lastCompletedCnt;
 
-                @Override protected void safeRun() {
+                @Override public void run() {
                     if (!(execSvc instanceof ThreadPoolExecutor))
                         return;
 
@@ -896,13 +894,10 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
         long metricsLogFreq = cfg.getMetricsLogFrequency();
 
         if (metricsLogFreq > 0) {
-            metricsLogTimer = new Timer("ignite-metrics-logger");
-
-            metricsLogTimer.scheduleAtFixedRate(new GridTimerTask() {
-                /** */
+            metricsLogTask = ctx.timeout().schedule(new Runnable() {
                 private final DecimalFormat dblFmt = new DecimalFormat("#.##");
 
-                @Override protected void safeRun() {
+                @Override public void run() {
                     if (log.isInfoEnabled()) {
                         ClusterMetrics m = cluster().localNode().metrics();
 
@@ -963,8 +958,11 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
                             sysPoolQSize = exec.getQueue().size();
                         }
 
+                        String id = U.id8(localNode().id());
+
                         String msg = NL +
                             "Metrics for local node (to disable set 'metricsLogFrequency' to 0)" + NL +
+                            "    ^-- Node [id=" + id + ", name=" + name() + "]" + NL +
                             "    ^-- H/N/C [hosts=" + hosts + ", nodes=" + nodes + ", CPUs=" + cpus + "]" + NL +
                             "    ^-- CPU [cur=" + dblFmt.format(cpuLoadPct) + "%, avg=" +
                                 dblFmt.format(avgCpuLoadPct) + "%, GC=" + dblFmt.format(gcPct) + "%]" + NL +
@@ -1165,6 +1163,8 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
 
         add(ATTR_CLIENT_MODE, cfg.isClientMode());
 
+        add(ATTR_CONSISTENCY_CHECK_SKIPPED, getBoolean(IGNITE_SKIP_CONFIGURATION_CONSISTENCY_CHECK));
+
         // Build a string from JVM arguments, because parameters with spaces are split.
         SB jvmArgs = new SB(512);
 
@@ -1550,7 +1550,7 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
                     ">>> Grid name: " + gridName + NL +
                     ">>> Local node [" +
                     "ID=" + locNode.id().toString().toUpperCase() +
-                    ", order=" + locNode.order() +
+                    ", order=" + locNode.order() + ", clientMode=" + ctx.clientNode() +
                     "]" + NL +
                     ">>> Local node addresses: " + U.addressesAsString(locNode) + NL +
                     ">>> Local ports: " + sb + NL;
@@ -1713,12 +1713,11 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
             if (updateNtfTimer != null)
                 updateNtfTimer.cancel();
 
-            if (starveTimer != null)
-                starveTimer.cancel();
+            if (starveTask != null)
+                starveTask.close();
 
-            // Cancel metrics log timer.
-            if (metricsLogTimer != null)
-                metricsLogTimer.cancel();
+            if (metricsLogTask != null)
+                metricsLogTask.close();
 
             boolean interrupted = false;
 
@@ -2370,7 +2369,11 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
         try {
             ctx.cache().dynamicStartCache(null, cacheName, nearCfg, true).get();
 
-            return ctx.cache().publicJCache(cacheName);
+            IgniteCacheProxy<K, V> cache = ctx.cache().publicJCache(cacheName);
+
+            checkNearCacheStarted(cache);
+
+            return cache;
         }
         catch (IgniteCheckedException e) {
             throw CU.convertToCacheException(e);
@@ -2397,7 +2400,11 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
                     ctx.cache().dynamicStartCache(null, cacheName, nearCfg, false).get();
             }
 
-            return ctx.cache().publicJCache(cacheName);
+            IgniteCacheProxy<K, V> cache = ctx.cache().publicJCache(cacheName);
+
+            checkNearCacheStarted(cache);
+
+            return cache;
         }
         catch (IgniteCheckedException e) {
             throw CU.convertToCacheException(e);
@@ -2407,6 +2414,15 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
         }
     }
 
+    /**
+     * @param cache Cache.
+     */
+    private void checkNearCacheStarted(IgniteCacheProxy<?, ?> cache) {
+        if (!cache.context().isNear())
+            throw new IgniteException("Failed to start near cache " +
+                "(a cache with the same name without near cache is already started)");
+    }
+
     /** {@inheritDoc} */
     @Override public void destroyCache(String cacheName) {
         guard();

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/internal/IgniteNodeAttributes.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteNodeAttributes.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteNodeAttributes.java
index 98cc3a7..928db5e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteNodeAttributes.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteNodeAttributes.java
@@ -126,9 +126,12 @@ public final class IgniteNodeAttributes {
     /** Security subject for authenticated node. */
     public static final String ATTR_SECURITY_SUBJECT = ATTR_PREFIX + ".security.subject";
 
-    /** Cache interceptors. */
+    /** Client mode flag. */
     public static final String ATTR_CLIENT_MODE = ATTR_PREFIX + ".cache.client";
 
+    /** Configuration consistency check disabled flag. */
+    public static final String ATTR_CONSISTENCY_CHECK_SKIPPED = ATTR_PREFIX + ".consistency.check.skipped";
+
     /**
      * Enforces singleton.
      */