You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/06/09 08:33:00 UTC

[11/24] incubator-ignite git commit: ignite-545: merge from ignite-sprint-6

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/96f0956d/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridBufferedParser.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridBufferedParser.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridBufferedParser.java
index 3f81dc4..a03d2c8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridBufferedParser.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridBufferedParser.java
@@ -18,7 +18,6 @@
 package org.apache.ignite.internal.util.nio;
 
 import org.apache.ignite.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
 
 import java.io.*;
 import java.nio.*;
@@ -33,9 +32,6 @@ import java.nio.*;
  *     | MSG_SIZE  |   MESSAGE  | MSG_SIZE  |   MESSAGE  |
  *     +--+--+--+--+--+--+...+--+--+--+--+--+--+--+...+--+
  * </pre>
- * <p>
- * It expects that first 4 bytes in stream are {@link U#IGNITE_HEADER}. If beginning of a stream,
- * isn't equal to these bytes than exception will be thrown.
  */
 public class GridBufferedParser implements GridNioParser {
     /** Buffer metadata key. */

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/96f0956d/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridDelimitedParser.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridDelimitedParser.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridDelimitedParser.java
new file mode 100644
index 0000000..256597c
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridDelimitedParser.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.util.nio;
+
+import org.apache.ignite.*;
+
+import java.io.*;
+import java.nio.*;
+
+/**
+ * This class implements stream parser based on {@link GridNioDelimitedBuffer}.
+ * <p>
+ * The rule for this parser is that every message sent over the stream is appended with
+ * delimiter (bytes array). So, the stream structure is as follows:
+ * <pre>
+ *     +--+--+...+--+--+--+--+--+--+--+...+--+--+--+--+--+-
+ *     |   MESSAGE  | DELIMITER  |  MESSAGE  | DELIMITER  |
+ *     +--+--+...+--+--+--+--+--+--+--+...+--+--+--+--+--+-
+ * </pre>
+ */
+public class GridDelimitedParser implements GridNioParser {
+    /** Buffer metadata key. */
+    private static final int BUF_META_KEY = GridNioSessionMetaKey.nextUniqueKey();
+
+    /** Delimiter. */
+    private final byte[] delim;
+
+    /** Direct buffer. */
+    private final boolean directBuf;
+
+    /**
+     * @param delim Delimiter.
+     * @param directBuf Direct buffer.
+     */
+    public GridDelimitedParser(byte[] delim, boolean directBuf) {
+        this.delim = delim;
+        this.directBuf = directBuf;
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte[] decode(GridNioSession ses, ByteBuffer buf) throws IOException, IgniteCheckedException {
+        GridNioDelimitedBuffer nioBuf = ses.meta(BUF_META_KEY);
+
+        // Decode for a given session is called per one thread, so there should not be any concurrency issues.
+        // However, we make some additional checks.
+        if (nioBuf == null) {
+            nioBuf = new GridNioDelimitedBuffer(delim);
+
+            GridNioDelimitedBuffer old = ses.addMeta(BUF_META_KEY, nioBuf);
+
+            assert old == null;
+        }
+
+        return nioBuf.read(buf);
+    }
+
+    /** {@inheritDoc} */
+    @Override public ByteBuffer encode(GridNioSession ses, Object msg) throws IOException, IgniteCheckedException {
+        byte[] msg0 = (byte[])msg;
+
+        int cap = msg0.length + delim.length;
+        ByteBuffer res = directBuf ? ByteBuffer.allocateDirect(cap) : ByteBuffer.allocate(cap);
+
+        res.put(msg0);
+        res.put(delim);
+
+        res.flip();
+
+        return res;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return this.getClass().getSimpleName();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/96f0956d/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioDelimitedBuffer.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioDelimitedBuffer.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioDelimitedBuffer.java
new file mode 100644
index 0000000..2b764ec
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioDelimitedBuffer.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.util.nio;
+
+import org.jetbrains.annotations.*;
+
+import java.nio.*;
+import java.util.*;
+
+/**
+ * Buffer with message delimiter support.
+ */
+public class GridNioDelimitedBuffer {
+    /** Delimiter. */
+    private final byte[] delim;
+
+    /** Data. */
+    private byte[] data = new byte[16384];
+
+    /** Count. */
+    private int cnt;
+
+    /** Index. */
+    private int idx;
+
+    /**
+     * @param delim Delimiter.
+     */
+    public GridNioDelimitedBuffer(byte[] delim) {
+        assert delim != null;
+        assert delim.length > 0;
+
+        this.delim = delim;
+
+        reset();
+    }
+
+    /**
+     * Resets buffer state.
+     */
+    private void reset() {
+        cnt = 0;
+        idx = 0;
+    }
+
+    /**
+     * @param buf Buffer.
+     * @return Message bytes or {@code null} if message is not fully read yet.
+     */
+    @Nullable public byte[] read(ByteBuffer buf) {
+        while(buf.hasRemaining()) {
+            if (cnt == data.length)
+                data = Arrays.copyOf(data, data.length * 2);
+
+            byte b = buf.get();
+
+            data[cnt++] = b;
+
+            if (b == delim[idx])
+                idx++;
+            else if (idx > 0) {
+                int pos = cnt - idx;
+
+                idx = 0;
+
+                for (int i = pos; i < cnt; i++) {
+                    if (data[pos] == delim[idx]) {
+                        pos++;
+
+                        idx++;
+                    }
+                    else {
+                        pos = cnt - idx;
+
+                        idx = 0;
+                    }
+                }
+            }
+
+            if (idx == delim.length) {
+                byte[] bytes = Arrays.copyOfRange(data, 0, cnt - delim.length);
+
+                reset();
+
+                return bytes;
+            }
+        }
+
+        return null;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/96f0956d/modules/core/src/main/java/org/apache/ignite/internal/util/offheap/GridOffHeapPartitionedMap.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/offheap/GridOffHeapPartitionedMap.java b/modules/core/src/main/java/org/apache/ignite/internal/util/offheap/GridOffHeapPartitionedMap.java
index 49850ab..a945262 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/offheap/GridOffHeapPartitionedMap.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/offheap/GridOffHeapPartitionedMap.java
@@ -201,6 +201,15 @@ public interface GridOffHeapPartitionedMap {
     public <T> GridCloseableIterator<T> iterator(CX2<T2<Long, Integer>, T2<Long, Integer>, T> c);
 
     /**
+     * Gets iterator over the partition.
+     *
+     * @param c Key/value closure.
+     * @param part Partition.
+     * @return Iterator over the partition.
+     */
+    public <T> GridCloseableIterator<T> iterator(CX2<T2<Long, Integer>, T2<Long, Integer>, T> c, int part);
+
+    /**
      * Gets iterator over certain partition.
      *
      * @param p Partition.

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/96f0956d/modules/core/src/main/java/org/apache/ignite/internal/util/offheap/unsafe/GridUnsafePartitionedMap.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/offheap/unsafe/GridUnsafePartitionedMap.java b/modules/core/src/main/java/org/apache/ignite/internal/util/offheap/unsafe/GridUnsafePartitionedMap.java
index ba67b30..4ffc33f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/offheap/unsafe/GridUnsafePartitionedMap.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/offheap/unsafe/GridUnsafePartitionedMap.java
@@ -277,21 +277,8 @@ public class GridUnsafePartitionedMap implements GridOffHeapPartitionedMap {
 
     /** {@inheritDoc} */
     @Override public GridCloseableIterator<IgniteBiTuple<byte[], byte[]>> iterator() {
-        return new GridCloseableIteratorAdapter<IgniteBiTuple<byte[], byte[]>>() {
-            private int p;
-
-            private GridCloseableIterator<IgniteBiTuple<byte[], byte[]>> curIt;
-
-            {
-                try {
-                    advance();
-                }
-                catch (IgniteCheckedException e) {
-                    e.printStackTrace(); // Should never happen.
-                }
-            }
-
-            private void advance() throws IgniteCheckedException {
+        return new PartitionedMapCloseableIterator<IgniteBiTuple<byte[], byte[]>>() {
+            protected void advance() throws IgniteCheckedException {
                 curIt = null;
 
                 while (p < parts) {
@@ -305,34 +292,6 @@ public class GridUnsafePartitionedMap implements GridOffHeapPartitionedMap {
 
                 curIt = null;
             }
-
-            @Override protected IgniteBiTuple<byte[], byte[]> onNext() throws IgniteCheckedException {
-                if (curIt == null)
-                    throw new NoSuchElementException();
-
-                IgniteBiTuple<byte[], byte[]> t = curIt.next();
-
-                if (!curIt.hasNext()) {
-                    curIt.close();
-
-                    advance();
-                }
-
-                return t;
-            }
-
-            @Override protected boolean onHasNext() {
-                return curIt != null;
-            }
-
-            @Override protected void onRemove() {
-                throw new UnsupportedOperationException();
-            }
-
-            @Override protected void onClose() throws IgniteCheckedException {
-                if (curIt != null)
-                    curIt.close();
-            }
         };
     }
 
@@ -340,21 +299,8 @@ public class GridUnsafePartitionedMap implements GridOffHeapPartitionedMap {
     @Override public <T> GridCloseableIterator<T> iterator(final CX2<T2<Long, Integer>, T2<Long, Integer>, T> c) {
         assert c != null;
 
-        return new GridCloseableIteratorAdapter<T>() {
-            private int p;
-
-            private GridCloseableIterator<T> curIt;
-
-            {
-                try {
-                    advance();
-                }
-                catch (IgniteCheckedException e) {
-                    e.printStackTrace(); // Should never happen.
-                }
-            }
-
-            private void advance() throws IgniteCheckedException {
+        return new PartitionedMapCloseableIterator<T>() {
+            protected void advance() throws IgniteCheckedException {
                 curIt = null;
 
                 while (p < parts) {
@@ -368,38 +314,16 @@ public class GridUnsafePartitionedMap implements GridOffHeapPartitionedMap {
 
                 curIt = null;
             }
-
-            @Override protected T onNext() throws IgniteCheckedException {
-                if (curIt == null)
-                    throw new NoSuchElementException();
-
-                T t = curIt.next();
-
-                if (!curIt.hasNext()) {
-                    curIt.close();
-
-                    advance();
-                }
-
-                return t;
-            }
-
-            @Override protected boolean onHasNext() {
-                return curIt != null;
-            }
-
-            @Override protected void onRemove() {
-                throw new UnsupportedOperationException();
-            }
-
-            @Override protected void onClose() throws IgniteCheckedException {
-                if (curIt != null)
-                    curIt.close();
-            }
         };
     }
 
     /** {@inheritDoc} */
+    @Override public <T> GridCloseableIterator<T> iterator(final CX2<T2<Long, Integer>, T2<Long, Integer>, T> c,
+       int part) {
+       return mapFor(part).iterator(c);
+    }
+
+    /** {@inheritDoc} */
     @Override public GridCloseableIterator<IgniteBiTuple<byte[], byte[]>> iterator(int p) {
         return mapFor(p).iterator();
     }
@@ -430,4 +354,63 @@ public class GridUnsafePartitionedMap implements GridOffHeapPartitionedMap {
     public long lruSize() {
         return lru.size();
     }
+
+    /**
+     *  Partitioned closable iterator.
+     */
+    private abstract class PartitionedMapCloseableIterator<T> extends GridCloseableIteratorAdapter<T> {
+        /** Current partition. */
+        protected int p;
+
+        /** Current iterator. */
+        protected GridCloseableIterator<T> curIt;
+
+        {
+            try {
+                advance();
+            }
+            catch (IgniteCheckedException e) {
+                e.printStackTrace(); // Should never happen.
+            }
+        }
+
+        /**
+         * Switch to next partition.
+         *
+         * @throws IgniteCheckedException If failed.
+         */
+        abstract void advance() throws IgniteCheckedException;
+
+        /** {@inheritDoc} */
+        @Override protected T onNext() throws IgniteCheckedException {
+            if (curIt == null)
+                throw new NoSuchElementException();
+
+            T t = curIt.next();
+
+            if (!curIt.hasNext()) {
+                curIt.close();
+
+                advance();
+            }
+
+            return t;
+        }
+
+        /** {@inheritDoc} */
+        @Override protected boolean onHasNext() {
+            return curIt != null;
+        }
+
+        /** {@inheritDoc} */
+        @Override protected void onRemove() {
+            throw new UnsupportedOperationException();
+        }
+
+        /** {@inheritDoc} */
+        @Override protected void onClose() throws IgniteCheckedException {
+            if (curIt != null)
+                curIt.close();
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/96f0956d/modules/core/src/main/java/org/apache/ignite/internal/util/spring/IgniteSpringHelper.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/spring/IgniteSpringHelper.java b/modules/core/src/main/java/org/apache/ignite/internal/util/spring/IgniteSpringHelper.java
index 82ef421..a9e9e93 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/spring/IgniteSpringHelper.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/spring/IgniteSpringHelper.java
@@ -69,7 +69,7 @@ public interface IgniteSpringHelper {
      * Loads all configurations with given type specified within given configuration file.
      *
      * @param cfgUrl Configuration file path or URL. This cannot be {@code null}.
-     * @param cl Required type of configuration.
+     * @param cls Required type of configuration.
      * @param excludedProps Properties to exclude.
      * @return Tuple containing all loaded configurations and Spring context used to load them.
      * @throws IgniteCheckedException If configuration could not be read.
@@ -81,7 +81,7 @@ public interface IgniteSpringHelper {
      * Loads all configurations with given type specified within given configuration input stream.
      *
      * @param cfgStream Configuration input stream. This cannot be {@code null}.
-     * @param cl Required type of configuration.
+     * @param cls Required type of configuration.
      * @param excludedProps Properties to exclude.
      * @return Tuple containing all loaded configurations and Spring context used to load them.
      * @throws IgniteCheckedException If configuration could not be read.

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/96f0956d/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCacheMetrics.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCacheMetrics.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCacheMetrics.java
index c05b9e0..30be424 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCacheMetrics.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCacheMetrics.java
@@ -29,7 +29,7 @@ import java.io.*;
  */
 public class VisorCacheMetrics implements Serializable {
     /** */
-    private static final int MICROSECONDS_IN_SECOND = 1_000_000;
+    private static final float MICROSECONDS_IN_SECOND = 1_000_000;
 
     /** */
     private static final long serialVersionUID = 0L;
@@ -94,14 +94,11 @@ public class VisorCacheMetrics implements Serializable {
     /** Reads per second. */
     private int readsPerSec;
 
-    /** Writes per second. */
-    private int writesPerSec;
+    /** Puts per second. */
+    private int putsPerSec;
 
-    /** Hits per second. */
-    private int hitsPerSec;
-
-    /** Misses per second. */
-    private int missesPerSec;
+    /** Removes per second. */
+    private int removalsPerSec;
 
     /** Commits per second. */
     private int commitsPerSec;
@@ -160,15 +157,11 @@ public class VisorCacheMetrics implements Serializable {
     /**
      * Calculate rate of metric per second.
      *
-     * @param metric Metric value.
-     * @param time Metric finish time.
-     * @param createTime Metric start time.
+     * @param meanTime Metric mean time.
      * @return Metric per second.
      */
-    private static int perSecond(int metric, long time, long createTime) {
-        long seconds = (time - createTime) / 1000;
-
-        return (seconds > 0) ? (int)(metric / seconds) : 0;
+    private static int perSecond(float meanTime) {
+        return (meanTime > 0) ? (int)(MICROSECONDS_IN_SECOND / meanTime) : 0;
     }
 
     /**
@@ -209,12 +202,11 @@ public class VisorCacheMetrics implements Serializable {
         cm.avgPutTime = m.getAveragePutTime();
         cm.avgRemovalTime = m.getAverageRemoveTime();
 
-        cm.readsPerSec = (int)(MICROSECONDS_IN_SECOND * 1.f / m.getAverageGetTime());
-        cm.writesPerSec = (int)(MICROSECONDS_IN_SECOND * 1.f / m.getAveragePutTime());
-        cm.hitsPerSec = -1;
-        cm.missesPerSec = (int)(MICROSECONDS_IN_SECOND * 1.f / m.getAverageRemoveTime());
-        cm.commitsPerSec = (int)(MICROSECONDS_IN_SECOND * 1.f / m.getAverageTxCommitTime());
-        cm.rollbacksPerSec = (int)(MICROSECONDS_IN_SECOND * 1.f / m.getAverageTxRollbackTime());
+        cm.readsPerSec = perSecond(m.getAverageGetTime());
+        cm.putsPerSec = perSecond(m.getAveragePutTime());
+        cm.removalsPerSec = perSecond(m.getAverageRemoveTime());
+        cm.commitsPerSec = perSecond(m.getAverageTxCommitTime());
+        cm.rollbacksPerSec = perSecond(m.getAverageTxRollbackTime());
 
         cm.qryMetrics = VisorCacheQueryMetrics.from(c.context().queries().metrics());
 
@@ -364,24 +356,17 @@ public class VisorCacheMetrics implements Serializable {
     }
 
     /**
-     * @return Writes per second.
-     */
-    public int writesPerSecond() {
-        return writesPerSec;
-    }
-
-    /**
-     * @return Hits per second.
+     * @return Puts per second.
      */
-    public int hitsPerSecond() {
-        return hitsPerSec;
+    public int putsPerSecond() {
+        return putsPerSec;
     }
 
     /**
-     * @return Misses per second.
+     * @return Removes per second.
      */
-    public int missesPerSecond() {
-        return missesPerSec;
+    public int removalsPerSecond() {
+        return removalsPerSec;
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/96f0956d/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCacheNearConfiguration.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCacheNearConfiguration.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCacheNearConfiguration.java
index a968f4f..3706650 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCacheNearConfiguration.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCacheNearConfiguration.java
@@ -33,7 +33,7 @@ public class VisorCacheNearConfiguration implements Serializable {
     /** */
     private static final long serialVersionUID = 0L;
 
-    /** Flag to enable/disable near cache eviction policy. */
+    /** Flag indicating if near cache enabled. */
     private boolean nearEnabled;
 
     /** Near cache start size. */
@@ -66,7 +66,7 @@ public class VisorCacheNearConfiguration implements Serializable {
     }
 
     /**
-     * @return Flag to enable/disable near cache eviction policy.
+     * @return {@code true} if near cache enabled.
      */
     public boolean nearEnabled() {
         return nearEnabled;

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/96f0956d/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCacheStartTask.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCacheStartTask.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCacheStartTask.java
new file mode 100644
index 0000000..2aa03a2
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCacheStartTask.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.visor.cache;
+
+import org.apache.ignite.*;
+import org.apache.ignite.compute.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.internal.processors.task.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.internal.visor.*;
+import org.apache.ignite.internal.visor.util.*;
+import org.jetbrains.annotations.*;
+
+import java.io.*;
+import java.util.*;
+
+/**
+ * Task that start cache or near cache with specified configuration.
+ */
+@GridInternal
+public class VisorCacheStartTask extends
+    VisorMultiNodeTask<VisorCacheStartTask.VisorCacheStartArg, Map<UUID, IgniteException>, Void> {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** {@inheritDoc} */
+    @Override protected VisorCacheStartJob job(VisorCacheStartArg arg) {
+        return new VisorCacheStartJob(arg, debug);
+    }
+
+    /** {@inheritDoc} */
+    @Nullable @Override protected Map<UUID, IgniteException> reduce0(List<ComputeJobResult> results) throws IgniteException {
+        Map<UUID, IgniteException> map = new HashMap<>();
+
+        for (ComputeJobResult res : results)
+            if (res.getException() != null)
+                map.put(res.getNode().id(), res.getException());
+
+        return map;
+    }
+
+    /**
+     * Cache start arguments.
+     */
+    @SuppressWarnings("PublicInnerClass")
+    public static class VisorCacheStartArg implements Serializable {
+        /** */
+        private static final long serialVersionUID = 0L;
+
+        /** */
+        private final boolean near;
+
+        /** */
+        private final String name;
+
+        /** */
+        private final String cfg;
+
+        /**
+         * @param near {@code true} if near cache should be started.
+         * @param name Name for near cache.
+         * @param cfg Cache XML configuration.
+         */
+        public VisorCacheStartArg(boolean near, String name, String cfg) {
+            this.near = near;
+            this.name = name;
+            this.cfg = cfg;
+        }
+
+        /**
+         * @return {@code true} if near cache should be started.
+         */
+        public boolean near() {
+            return near;
+        }
+
+        /**
+         * @return Name for near cache.
+         */
+        public String name() {
+            return name;
+        }
+
+        /**
+         * @return Cache XML configuration.
+         */
+        public String configuration() {
+            return cfg;
+        }
+    }
+
+    /**
+     * Job that start cache or near cache with specified configuration.
+     */
+    private static class VisorCacheStartJob extends VisorJob<VisorCacheStartArg, Void> {
+        /** */
+        private static final long serialVersionUID = 0L;
+
+        /**
+         * Create job.
+         *
+         * @param arg Contains cache name and XML configurations of cache.
+         * @param debug Debug flag.
+         */
+        private VisorCacheStartJob(VisorCacheStartArg arg, boolean debug) {
+            super(arg, debug);
+        }
+
+        /** {@inheritDoc} */
+        @Override protected Void run(VisorCacheStartArg arg) throws IgniteException {
+            String cfg = arg.configuration();
+
+            assert !F.isEmpty(cfg);
+
+            try (ByteArrayInputStream bais = new ByteArrayInputStream(cfg.getBytes())) {
+                if (arg.near) {
+                    NearCacheConfiguration nearCfg = Ignition.loadSpringBean(bais, "nearCacheConfiguration");
+
+                    ignite.createNearCache(VisorTaskUtils.unescapeName(arg.name()), nearCfg);
+                }
+                else {
+                    CacheConfiguration cacheCfg = Ignition.loadSpringBean(bais, "cacheConfiguration");
+
+                    ignite.createCache(cacheCfg);
+                }
+            }
+            catch (IOException e) {
+                throw new  IgniteException(e);
+            }
+
+            return null;
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(VisorCacheStartJob.class, this);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/96f0956d/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorQueryArg.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorQueryArg.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorQueryArg.java
index becebda..5050414 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorQueryArg.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorQueryArg.java
@@ -32,17 +32,22 @@ public class VisorQueryArg implements Serializable {
     /** Query text. */
     private final String qryTxt;
 
+    /** Flag whether to execute query locally. */
+    private final boolean local;
+
     /** Result batch size. */
     private final int pageSize;
 
     /**
      * @param cacheName Cache name for query.
      * @param qryTxt Query text.
+     * @param local Flag whether to execute query locally.
      * @param pageSize Result batch size.
      */
-    public VisorQueryArg(String cacheName, String qryTxt, int pageSize) {
+    public VisorQueryArg(String cacheName, String qryTxt, boolean local, int pageSize) {
         this.cacheName = cacheName;
         this.qryTxt = qryTxt;
+        this.local = local;
         this.pageSize = pageSize;
     }
 
@@ -61,6 +66,13 @@ public class VisorQueryArg implements Serializable {
     }
 
     /**
+     * @return {@code true} if query should be executed locally.
+     */
+    public boolean local() {
+        return local;
+    }
+
+    /**
      * @return Page size.
      */
     public int pageSize() {

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/96f0956d/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorQueryJob.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorQueryJob.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorQueryJob.java
index ebf62fa..4a9daad 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorQueryJob.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorQueryJob.java
@@ -75,6 +75,7 @@ public class VisorQueryJob extends VisorJob<VisorQueryArg, IgniteBiTuple<? exten
             if (scan) {
                 ScanQuery<Object, Object> qry = new ScanQuery<>(null);
                 qry.setPageSize(arg.pageSize());
+                qry.setLocal(arg.local());
 
                 long start = U.currentTimeMillis();
 
@@ -100,6 +101,7 @@ public class VisorQueryJob extends VisorJob<VisorQueryArg, IgniteBiTuple<? exten
             else {
                 SqlFieldsQuery qry = new SqlFieldsQuery(arg.queryTxt());
                 qry.setPageSize(arg.pageSize());
+                qry.setLocal(arg.local());
 
                 long start = U.currentTimeMillis();
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/96f0956d/modules/core/src/main/java/org/apache/ignite/internal/visor/util/VisorTaskUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/util/VisorTaskUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/util/VisorTaskUtils.java
index 7cfc18f..e8ae76d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/visor/util/VisorTaskUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/util/VisorTaskUtils.java
@@ -120,6 +120,16 @@ public class VisorTaskUtils {
     }
 
     /**
+     * @param name Escaped name.
+     * @return Name or {@code null} for default name.
+     */
+    public static String unescapeName(String name) {
+        assert name != null;
+
+        return DFLT_EMPTY_NAME.equals(name) ? null : name;
+    }
+
+    /**
      * @param a First name.
      * @param b Second name.
      * @return {@code true} if both names equals.

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/96f0956d/modules/core/src/main/java/org/apache/ignite/lang/IgniteAsyncSupport.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/lang/IgniteAsyncSupport.java b/modules/core/src/main/java/org/apache/ignite/lang/IgniteAsyncSupport.java
index 76ebcee..be05a38 100644
--- a/modules/core/src/main/java/org/apache/ignite/lang/IgniteAsyncSupport.java
+++ b/modules/core/src/main/java/org/apache/ignite/lang/IgniteAsyncSupport.java
@@ -22,9 +22,9 @@ package org.apache.ignite.lang;
  */
 public interface IgniteAsyncSupport {
     /**
-     * Gets component with asynchronous mode enabled.
+     * Gets instance of this component with asynchronous mode enabled.
      *
-     * @return Component with asynchronous mode enabled.
+     * @return Instance of this component with asynchronous mode enabled.
      */
     public IgniteAsyncSupport withAsync();
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/96f0956d/modules/core/src/main/java/org/apache/ignite/services/Service.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/services/Service.java b/modules/core/src/main/java/org/apache/ignite/services/Service.java
index 2bd5649..4f927a3 100644
--- a/modules/core/src/main/java/org/apache/ignite/services/Service.java
+++ b/modules/core/src/main/java/org/apache/ignite/services/Service.java
@@ -55,10 +55,7 @@ import java.io.*;
  * ...
  * GridServices svcs = grid.services();
  *
- * GridFuture&lt;?&gt; fut = svcs.deployClusterSingleton("mySingleton", new MyGridService());
- *
- * // Wait for deployment to complete.
- * fut.get();
+ * svcs.deployClusterSingleton("mySingleton", new MyGridService());
  * </pre>
  * Or from grid configuration on startup:
  * <pre name="code" class="java">

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/96f0956d/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java
index b43f8a5..871512c 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java
@@ -67,7 +67,7 @@ public abstract class IgniteSpiAdapter implements IgniteSpi, IgniteSpiManagement
     private String name;
 
     /** Grid SPI context. */
-    private volatile IgniteSpiContext spiCtx = new GridDummySpiContext(null, false);
+    private volatile IgniteSpiContext spiCtx = new GridDummySpiContext(null, false, null);
 
     /** Discovery listener. */
     private GridLocalEventListener paramsLsnr;
@@ -190,7 +190,7 @@ public abstract class IgniteSpiAdapter implements IgniteSpi, IgniteSpiManagement
         ClusterNode locNode = spiCtx == null ? null : spiCtx.localNode();
 
         // Set dummy no-op context.
-        spiCtx = new GridDummySpiContext(locNode, true);
+        spiCtx = new GridDummySpiContext(locNode, true, spiCtx);
     }
 
     /**
@@ -551,15 +551,24 @@ public abstract class IgniteSpiAdapter implements IgniteSpi, IgniteSpiManagement
         /** */
         private final boolean stopping;
 
+        /** */
+        private final MessageFactory msgFactory;
+
+        /** */
+        private final MessageFormatter msgFormatter;
+
         /**
          * Create temp SPI context.
          *
          * @param locNode Local node.
          * @param stopping Node stopping flag.
+         * @param spiCtx SPI context.
          */
-        GridDummySpiContext(ClusterNode locNode, boolean stopping) {
+        GridDummySpiContext(ClusterNode locNode, boolean stopping, @Nullable IgniteSpiContext spiCtx) {
             this.locNode = locNode;
             this.stopping = stopping;
+            this.msgFactory = spiCtx != null ? spiCtx.messageFactory() : null;
+            this.msgFormatter = spiCtx != null ? spiCtx.messageFormatter() : null;
         }
 
         /** {@inheritDoc} */
@@ -711,12 +720,12 @@ public abstract class IgniteSpiAdapter implements IgniteSpi, IgniteSpiManagement
 
         /** {@inheritDoc} */
         @Override public MessageFormatter messageFormatter() {
-            return null;
+            return msgFormatter;
         }
 
         /** {@inheritDoc} */
         @Override public MessageFactory messageFactory() {
-            return null;
+            return msgFactory;
         }
 
         /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/96f0956d/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
index 2d5c541..fd17791 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
@@ -203,7 +203,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
     public static final int DFLT_ACK_SND_THRESHOLD = 16;
 
     /** Default socket write timeout. */
-    public static final long DFLT_SOCK_WRITE_TIMEOUT = GridNioServer.DFLT_SES_WRITE_TIMEOUT;
+    public static final long DFLT_SOCK_WRITE_TIMEOUT = 200;
 
     /** No-op runnable. */
     private static final IgniteRunnable NOOP = new IgniteRunnable() {

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/96f0956d/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpi.java
index e3182c4..4196306 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpi.java
@@ -226,10 +226,6 @@ public class TcpClientDiscoverySpi extends TcpDiscoverySpiAdapter implements Tcp
         if (netTimeout < 3000)
             U.warn(log, "Network timeout is too low (at least 3000 ms recommended): " + netTimeout);
 
-        // Warn on odd heartbeat frequency.
-        if (hbFreq < 2000)
-            U.warn(log, "Heartbeat frequency is too high (at least 2000 ms recommended): " + hbFreq);
-
         registerMBean(gridName, this, TcpClientDiscoverySpiMBean.class);
 
         try {

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/96f0956d/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
index 206a8c3..ed0e9dd 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
@@ -288,6 +288,10 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov
     @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized")
     private ConcurrentLinkedDeque<String> debugLog;
 
+    /** */
+    private final CopyOnWriteArrayList<IgniteInClosure<TcpDiscoveryAbstractMessage>> sendMsgLsnrs =
+        new CopyOnWriteArrayList<>();
+
     /** {@inheritDoc} */
     @IgniteInstanceResource
     @Override public void injectResources(Ignite ignite) {
@@ -853,10 +857,6 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov
         if (netTimeout < 3000)
             U.warn(log, "Network timeout is too low (at least 3000 ms recommended): " + netTimeout);
 
-        // Warn on odd heartbeat frequency.
-        if (hbFreq < 2000)
-            U.warn(log, "Heartbeat frequency is too high (at least 2000 ms recommended): " + hbFreq);
-
         registerMBean(gridName, this, TcpDiscoverySpiMBean.class);
 
         if (ipFinder instanceof TcpDiscoveryMulticastIpFinder) {
@@ -2068,13 +2068,16 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov
 
     /**
      * <strong>FOR TEST ONLY!!!</strong>
-     * <p>
-     * This method is intended for test purposes only.
-     *
-     * @param msg Message.
      */
-    void onBeforeMessageSentAcrossRing(Serializable msg) {
-        // No-op.
+    public void addSendMessageListener(IgniteInClosure<TcpDiscoveryAbstractMessage> msg) {
+        sendMsgLsnrs.add(msg);
+    }
+
+    /**
+     * <strong>FOR TEST ONLY!!!</strong>
+     */
+    public void removeSendMessageListener(IgniteInClosure<TcpDiscoveryAbstractMessage> msg) {
+        sendMsgLsnrs.remove(msg);
     }
 
     /**
@@ -2683,11 +2686,30 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov
 
             assert ring.hasRemoteNodes();
 
-            onBeforeMessageSentAcrossRing(msg);
+            for (IgniteInClosure<TcpDiscoveryAbstractMessage> msgLsnr : sendMsgLsnrs)
+                msgLsnr.apply(msg);
 
             if (redirectToClients(msg)) {
-                for (ClientMessageWorker clientMsgWorker : clientMsgWorkers.values())
-                    clientMsgWorker.addMessage(msg);
+                byte[] marshalledMsg = null;
+
+                for (ClientMessageWorker clientMsgWorker : clientMsgWorkers.values()) {
+                    // Send a clone to client to avoid ConcurrentModificationException
+                    TcpDiscoveryAbstractMessage msgClone;
+
+                    try {
+                        if (marshalledMsg == null)
+                            marshalledMsg = marsh.marshal(msg);
+
+                        msgClone = marsh.unmarshal(marshalledMsg, null);
+                    }
+                    catch (IgniteCheckedException e) {
+                        U.error(log, "Failed to marshal message: " + msg, e);
+
+                        msgClone = msg;
+                    }
+
+                    clientMsgWorker.addMessage(msgClone);
+                }
             }
 
             Collection<TcpDiscoveryNode> failedNodes;
@@ -3930,18 +3952,6 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov
                 long topVer;
 
                 if (locNodeCoord) {
-                    if (!msg.client() && ipFinder.isShared()) {
-                        try {
-                            ipFinder.unregisterAddresses(leftNode.socketAddresses());
-                        }
-                        catch (IgniteSpiException e) {
-                            if (log.isDebugEnabled())
-                                log.debug("Failed to unregister left node address: " + leftNode);
-
-                            onException("Failed to unregister left node address: " + leftNode, e);
-                        }
-                    }
-
                     topVer = ring.incrementTopologyVersion();
 
                     msg.topologyVersion(topVer);
@@ -4109,20 +4119,6 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov
                 long topVer;
 
                 if (locNodeCoord) {
-                    if (!node.isClient() && ipFinder.isShared()) {
-                        try {
-                            ipFinder.unregisterAddresses(node.socketAddresses());
-                        }
-                        catch (IgniteSpiException e) {
-                            if (log.isDebugEnabled())
-                                log.debug("Failed to unregister failed node address [node=" + node +
-                                    ", err=" + e.getMessage() + ']');
-
-                            onException("Failed to unregister failed node address [node=" + node +
-                                ", err=" + e.getMessage() + ']', e);
-                        }
-                    }
-
                     topVer = ring.incrementTopologyVersion();
 
                     msg.topologyVersion(topVer);

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/96f0956d/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiAdapter.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiAdapter.java
index 26f6869..802da02 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiAdapter.java
@@ -51,20 +51,20 @@ abstract class TcpDiscoverySpiAdapter extends IgniteSpiAdapter implements Discov
     /** Default port to listen (value is <tt>47500</tt>). */
     public static final int DFLT_PORT = 47500;
 
-    /** Default socket operations timeout in milliseconds (value is <tt>2,000ms</tt>). */
-    public static final long DFLT_SOCK_TIMEOUT = 2000;
+    /** Default socket operations timeout in milliseconds (value is <tt>200ms</tt>). */
+    public static final long DFLT_SOCK_TIMEOUT = 200;
 
-    /** Default timeout for receiving message acknowledgement in milliseconds (value is <tt>5,000ms</tt>). */
-    public static final long DFLT_ACK_TIMEOUT = 5000;
+    /** Default timeout for receiving message acknowledgement in milliseconds (value is <tt>50ms</tt>). */
+    public static final long DFLT_ACK_TIMEOUT = 50;
 
-    /** Default network timeout in milliseconds (value is <tt>5,000ms</tt>). */
+    /** Default network timeout in milliseconds (value is <tt>5000ms</tt>). */
     public static final long DFLT_NETWORK_TIMEOUT = 5000;
 
     /** Default value for thread priority (value is <tt>10</tt>). */
     public static final int DFLT_THREAD_PRI = 10;
 
-    /** Default heartbeat messages issuing frequency (value is <tt>2,000ms</tt>). */
-    public static final long DFLT_HEARTBEAT_FREQ = 2000;
+    /** Default heartbeat messages issuing frequency (value is <tt>100ms</tt>). */
+    public static final long DFLT_HEARTBEAT_FREQ = 100;
 
     /** Default size of topology snapshots history. */
     public static final int DFLT_TOP_HISTORY_SIZE = 1000;

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/96f0956d/modules/core/src/main/java/org/apache/ignite/stream/StreamAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/stream/StreamAdapter.java b/modules/core/src/main/java/org/apache/ignite/stream/StreamAdapter.java
new file mode 100644
index 0000000..0c4e2d1
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/stream/StreamAdapter.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.stream;
+
+import org.apache.ignite.*;
+
+import java.util.*;
+
+/**
+ * Convenience adapter for streamers. Adapters are optional components for
+ * streaming from different data sources. The purpose of adapters is to
+ * convert different message formats into Ignite stream key-value tuples
+ * and feed the tuples into the provided {@link org.apache.ignite.IgniteDataStreamer}.
+ */
+public abstract class StreamAdapter<T, K, V> {
+    /** Tuple extractor. */
+    private StreamTupleExtractor<T, K, V> extractor;
+
+    /** Streamer. */
+    private IgniteDataStreamer<K, V> stmr;
+
+    /** Ignite. */
+    private Ignite ignite;
+
+    /**
+     * Empty constructor.
+     */
+    protected StreamAdapter() {
+        // No-op.
+    }
+
+    /**
+     * Stream adapter.
+     *
+     * @param stmr Streamer.
+     * @param extractor Tuple extractor.
+     */
+    protected StreamAdapter(IgniteDataStreamer<K, V> stmr, StreamTupleExtractor<T, K, V> extractor) {
+        this.stmr = stmr;
+        this.extractor = extractor;
+    }
+
+    /**
+     * @return Provided data streamer.
+     */
+    public IgniteDataStreamer<K, V> getStreamer() {
+        return stmr;
+    }
+
+    /**
+     * @param stmr Ignite data streamer.
+     */
+    public void setStreamer(IgniteDataStreamer<K, V> stmr) {
+        this.stmr = stmr;
+    }
+
+    /**
+     * @return Provided tuple extractor.
+     */
+    public StreamTupleExtractor<T, K, V> getTupleExtractor() {
+        return extractor;
+    }
+
+    /**
+     * @param extractor Extractor for key-value tuples from messages.
+     */
+    public void setTupleExtractor(StreamTupleExtractor<T, K, V> extractor) {
+        this.extractor = extractor;
+    }
+
+    /**
+     * @return Provided {@link Ignite} instance.
+     */
+    public Ignite getIgnite() {
+        return ignite;
+    }
+
+    /**
+     * @param ignite {@link Ignite} instance.
+     */
+    public void setIgnite(Ignite ignite) {
+        this.ignite = ignite;
+    }
+
+    /**
+     * Converts given message to a tuple and adds it to the underlying streamer.
+     *
+     * @param msg Message to convert.
+     */
+    protected void addMessage(T msg) {
+        Map.Entry<K, V> e = extractor.extract(msg);
+
+        if (e != null)
+            stmr.addData(e);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/96f0956d/modules/core/src/main/java/org/apache/ignite/stream/StreamTupleExtractor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/stream/StreamTupleExtractor.java b/modules/core/src/main/java/org/apache/ignite/stream/StreamTupleExtractor.java
new file mode 100644
index 0000000..d2a4ede
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/stream/StreamTupleExtractor.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.stream;
+
+import java.util.*;
+
+/**
+ * Stream tuple extractor to convert messages to Ignite key-value tuples.
+ */
+public interface StreamTupleExtractor<T, K, V> {
+    /**
+     * Extracts a key-value tuple from a message.
+     *
+     * @param msg Message.
+     * @return Key-value tuple.
+     */
+    public Map.Entry<K, V> extract(T msg);
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/96f0956d/modules/core/src/main/java/org/apache/ignite/stream/socket/SocketMessageConverter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/stream/socket/SocketMessageConverter.java b/modules/core/src/main/java/org/apache/ignite/stream/socket/SocketMessageConverter.java
new file mode 100644
index 0000000..8161d86
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/stream/socket/SocketMessageConverter.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.stream.socket;
+
+/**
+ * Socket message converter.
+ */
+public interface SocketMessageConverter<T> {
+    /**
+     * Converter message represented by array of bytes to object.
+     *
+     * @param msg Message.
+     * @return Converted object.
+     */
+    public T convert(byte[] msg);
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/96f0956d/modules/core/src/main/java/org/apache/ignite/stream/socket/SocketStreamer.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/stream/socket/SocketStreamer.java b/modules/core/src/main/java/org/apache/ignite/stream/socket/SocketStreamer.java
new file mode 100644
index 0000000..07ce77e
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/stream/socket/SocketStreamer.java
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.stream.socket;
+
+import org.apache.ignite.*;
+import org.apache.ignite.internal.util.nio.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.marshaller.jdk.*;
+import org.apache.ignite.stream.*;
+
+import org.jetbrains.annotations.*;
+
+import java.net.*;
+import java.nio.*;
+
+/**
+ * Server that receives data from TCP socket, converts it to key-value pairs using {@link StreamTupleExtractor} and
+ * streams into {@link IgniteDataStreamer} instance.
+ * <p>
+ * By default server uses size-based message processing. That is every message sent over the socket is prepended with
+ * 4-byte integer header containing message size. If message delimiter is defined (see {@link #setDelimiter}) then
+ * delimiter-based message processing will be used. That is every message sent over the socket is appended with
+ * provided delimiter.
+ * <p>
+ * Received messages through socket converts to Java object using standard serialization. Conversion functionality
+ * can be customized via user defined {@link SocketMessageConverter} (e.g. in order to convert messages from
+ * non Java clients).
+ */
+public class SocketStreamer<T, K, V> extends StreamAdapter<T, K, V> {
+    /** Default threads. */
+    private static final int DFLT_THREADS = Runtime.getRuntime().availableProcessors();
+
+    /** Logger. */
+    private IgniteLogger log;
+
+    /** Address. */
+    private InetAddress addr;
+
+    /** Server port. */
+    private int port;
+
+    /** Threads number. */
+    private int threads = DFLT_THREADS;
+
+    /** Direct mode. */
+    private boolean directMode;
+
+    /** Delimiter. */
+    private byte[] delim;
+
+    /** Converter. */
+    private SocketMessageConverter<T> converter;
+
+    /** Server. */
+    private GridNioServer<byte[]> srv;
+
+    /**
+     * Sets server address.
+     *
+     * @param addr Address.
+     */
+    public void setAddr(InetAddress addr) {
+        this.addr = addr;
+    }
+
+    /**
+     * Sets port number.
+     *
+     * @param port Port.
+     */
+    public void setPort(int port) {
+        this.port = port;
+    }
+
+    /**
+     * Sets threadds amount.
+     *
+     * @param threads Threads.
+     */
+    public void setThreads(int threads) {
+        this.threads = threads;
+    }
+
+    /**
+     * Sets direct mode flag.
+     *
+     * @param directMode Direct mode.
+     */
+    public void setDirectMode(boolean directMode) {
+        this.directMode = directMode;
+    }
+
+    /**
+     * Sets message delimiter.
+     *
+     * @param delim Delimiter.
+     */
+    public void setDelimiter(byte[] delim) {
+        this.delim = delim;
+    }
+
+    /**
+     * Sets message converter.
+     *
+     * @param converter Converter.
+     */
+    public void setConverter(SocketMessageConverter<T> converter) {
+        this.converter = converter;
+    }
+
+    /**
+     * Starts streamer.
+     *
+     * @throws IgniteException If failed.
+     */
+    public void start() {
+        A.notNull(getTupleExtractor(), "tupleExtractor");
+        A.notNull(getStreamer(), "streamer");
+        A.notNull(getIgnite(), "ignite");
+        A.ensure(threads > 0, "threads > 0");
+
+        log = getIgnite().log();
+
+        GridNioServerListener<byte[]> lsnr = new GridNioServerListenerAdapter<byte[]>() {
+            @Override public void onConnected(GridNioSession ses) {
+                assert ses.accepted();
+
+                if (log.isDebugEnabled())
+                    log.debug("Accepted connection: " + ses.remoteAddress());
+            }
+
+            @Override public void onDisconnected(GridNioSession ses, @Nullable Exception e) {
+                if (e != null)
+                    log.error("Connection failed with exception", e);
+            }
+
+            @Override public void onMessage(GridNioSession ses, byte[] msg) {
+                addMessage(converter.convert(msg));
+            }
+        };
+
+        ByteOrder byteOrder = ByteOrder.BIG_ENDIAN;
+
+        GridNioParser parser = F.isEmpty(delim) ? new GridBufferedParser(directMode, byteOrder) :
+            new GridDelimitedParser(delim, directMode);
+
+        if (converter == null)
+            converter = new DefaultConverter<>();
+
+        GridNioFilter codec = new GridNioCodecFilter(parser, log, directMode);
+
+        GridNioFilter[] filters = new GridNioFilter[] {codec};
+
+        try {
+            srv = new GridNioServer.Builder<byte[]>()
+                .address(addr == null ? InetAddress.getLocalHost() : addr)
+                .port(port)
+                .listener(lsnr)
+                .logger(log)
+                .selectorCount(threads)
+                .byteOrder(byteOrder)
+                .filters(filters)
+                .build();
+        }
+        catch (IgniteCheckedException | UnknownHostException e) {
+            throw new IgniteException(e);
+        }
+
+        srv.start();
+
+        if (log.isDebugEnabled())
+            log.debug("Socket streaming server started on " + addr + ':' + port);
+    }
+
+    /**
+     * Stops streamer.
+     */
+    public void stop() {
+        srv.stop();
+
+        if (log.isDebugEnabled())
+            log.debug("Socket streaming server stopped");
+    }
+
+    /**
+     * Converts message to Java object using Jdk marshaller.
+     */
+    private static class DefaultConverter<T> implements SocketMessageConverter<T> {
+        /** Marshaller. */
+        private static final JdkMarshaller MARSH = new JdkMarshaller();
+
+        /** {@inheritDoc} */
+        @Override public T convert(byte[] msg) {
+            try {
+                return MARSH.unmarshal(msg, null);
+            }
+            catch (IgniteCheckedException e) {
+                throw new IgniteException(e);
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/96f0956d/modules/core/src/main/java/org/apache/ignite/stream/socket/package-info.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/stream/socket/package-info.java b/modules/core/src/main/java/org/apache/ignite/stream/socket/package-info.java
new file mode 100644
index 0000000..e1cef65
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/stream/socket/package-info.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Contains socket streamer implementation.
+ */
+package org.apache.ignite.stream.socket;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/96f0956d/modules/core/src/main/resources/META-INF/classnames.properties
----------------------------------------------------------------------
diff --git a/modules/core/src/main/resources/META-INF/classnames.properties b/modules/core/src/main/resources/META-INF/classnames.properties
index a79d5b8..b3eed46 100644
--- a/modules/core/src/main/resources/META-INF/classnames.properties
+++ b/modules/core/src/main/resources/META-INF/classnames.properties
@@ -115,6 +115,7 @@ org.apache.ignite.configuration.CollectionConfiguration
 org.apache.ignite.configuration.DeploymentMode
 org.apache.ignite.configuration.IgniteReflectionFactory
 org.apache.ignite.configuration.NearCacheConfiguration
+org.apache.ignite.configuration.TopologyValidator
 org.apache.ignite.configuration.TransactionConfiguration
 org.apache.ignite.events.CacheEvent
 org.apache.ignite.events.CacheQueryExecutedEvent
@@ -248,6 +249,7 @@ org.apache.ignite.internal.managers.communication.GridIoManager$ConcurrentHashMa
 org.apache.ignite.internal.managers.communication.GridIoMessage
 org.apache.ignite.internal.managers.communication.GridIoPolicy
 org.apache.ignite.internal.managers.communication.GridIoUserMessage
+org.apache.ignite.internal.managers.communication.GridLifecycleAwareMessageFilter
 org.apache.ignite.internal.managers.deployment.GridDeploymentInfoBean
 org.apache.ignite.internal.managers.deployment.GridDeploymentPerVersionStore$2
 org.apache.ignite.internal.managers.deployment.GridDeploymentRequest
@@ -321,13 +323,13 @@ org.apache.ignite.internal.processors.cache.GridCacheAdapter$72
 org.apache.ignite.internal.processors.cache.GridCacheAdapter$73
 org.apache.ignite.internal.processors.cache.GridCacheAdapter$74
 org.apache.ignite.internal.processors.cache.GridCacheAdapter$9
-org.apache.ignite.internal.processors.cache.GridCacheAdapter$GlobalClearAllCallable
+org.apache.ignite.internal.processors.cache.GridCacheAdapter$GlobalClearAllJob
 org.apache.ignite.internal.processors.cache.GridCacheAdapter$GlobalClearCallable
-org.apache.ignite.internal.processors.cache.GridCacheAdapter$GlobalClearKeySetCallable
+org.apache.ignite.internal.processors.cache.GridCacheAdapter$GlobalClearKeySetJob
 org.apache.ignite.internal.processors.cache.GridCacheAdapter$GlobalSizeCallable
 org.apache.ignite.internal.processors.cache.GridCacheAdapter$LoadCacheClosure
 org.apache.ignite.internal.processors.cache.GridCacheAdapter$LoadKeysCallable
-org.apache.ignite.internal.processors.cache.GridCacheAdapter$SizeCallable
+org.apache.ignite.internal.processors.cache.GridCacheAdapter$SizeJob
 org.apache.ignite.internal.processors.cache.GridCacheAdapter$UpdateGetTimeStatClosure
 org.apache.ignite.internal.processors.cache.GridCacheAdapter$UpdatePutAndGetTimeStatClosure
 org.apache.ignite.internal.processors.cache.GridCacheAdapter$UpdatePutTimeStatClosure
@@ -445,6 +447,7 @@ org.apache.ignite.internal.processors.cache.IgniteCacheProxy$2
 org.apache.ignite.internal.processors.cache.IgniteCacheProxy$4
 org.apache.ignite.internal.processors.cache.IgniteCacheProxy$5
 org.apache.ignite.internal.processors.cache.IgniteCacheProxy$6
+org.apache.ignite.internal.processors.cache.IgniteCacheProxy$7
 org.apache.ignite.internal.processors.cache.KeyCacheObject
 org.apache.ignite.internal.processors.cache.KeyCacheObjectImpl
 org.apache.ignite.internal.processors.cache.affinity.GridCacheAffinityProxy
@@ -452,8 +455,9 @@ org.apache.ignite.internal.processors.cache.datastructures.CacheDataStructuresMa
 org.apache.ignite.internal.processors.cache.datastructures.CacheDataStructuresManager$QueueHeaderPredicate
 org.apache.ignite.internal.processors.cache.datastructures.CacheDataStructuresManager$RemoveSetDataCallable
 org.apache.ignite.internal.processors.cache.distributed.GridCacheCommittedTxInfo
-org.apache.ignite.internal.processors.cache.distributed.GridCacheOptimisticCheckPreparedTxRequest
-org.apache.ignite.internal.processors.cache.distributed.GridCacheOptimisticCheckPreparedTxResponse
+org.apache.ignite.internal.processors.cache.distributed.GridCacheTxRecoveryFuture$1
+org.apache.ignite.internal.processors.cache.distributed.GridCacheTxRecoveryRequest
+org.apache.ignite.internal.processors.cache.distributed.GridCacheTxRecoveryResponse
 org.apache.ignite.internal.processors.cache.distributed.GridCacheTtlUpdateRequest
 org.apache.ignite.internal.processors.cache.distributed.GridDistributedBaseMessage
 org.apache.ignite.internal.processors.cache.distributed.GridDistributedCacheAdapter
@@ -725,6 +729,7 @@ org.apache.ignite.internal.processors.cache.transactions.IgniteTxHandler$11
 org.apache.ignite.internal.processors.cache.transactions.IgniteTxHandler$12
 org.apache.ignite.internal.processors.cache.transactions.IgniteTxHandler$13
 org.apache.ignite.internal.processors.cache.transactions.IgniteTxHandler$14
+org.apache.ignite.internal.processors.cache.transactions.IgniteTxHandler$15
 org.apache.ignite.internal.processors.cache.transactions.IgniteTxHandler$2
 org.apache.ignite.internal.processors.cache.transactions.IgniteTxHandler$3
 org.apache.ignite.internal.processors.cache.transactions.IgniteTxHandler$4
@@ -762,9 +767,11 @@ org.apache.ignite.internal.processors.cache.transactions.IgniteTxLocalAdapter$Po
 org.apache.ignite.internal.processors.cache.transactions.IgniteTxLocalAdapter$PostLockClosure2
 org.apache.ignite.internal.processors.cache.transactions.IgniteTxLocalAdapter$PostMissClosure
 org.apache.ignite.internal.processors.cache.transactions.IgniteTxManager$2
+org.apache.ignite.internal.processors.cache.transactions.IgniteTxManager$3
 org.apache.ignite.internal.processors.cache.transactions.IgniteTxManager$AtomicInt
 org.apache.ignite.internal.processors.cache.transactions.IgniteTxManager$CommitListener
 org.apache.ignite.internal.processors.cache.transactions.IgniteTxManager$CommittedVersion
+org.apache.ignite.internal.processors.cache.transactions.IgniteTxManager$NodeFailureTimeoutObject$1
 org.apache.ignite.internal.processors.cache.transactions.IgniteTxMap
 org.apache.ignite.internal.processors.cache.transactions.IgniteTxMap$1
 org.apache.ignite.internal.processors.cache.transactions.IgniteTxMap$1$1
@@ -809,6 +816,7 @@ org.apache.ignite.internal.processors.continuous.GridContinuousProcessor$Discove
 org.apache.ignite.internal.processors.continuous.GridContinuousProcessor$DiscoveryDataItem
 org.apache.ignite.internal.processors.continuous.GridContinuousProcessor$StartRequestData
 org.apache.ignite.internal.processors.datastreamer.DataStreamProcessor$3
+org.apache.ignite.internal.processors.datastreamer.DataStreamProcessor$4
 org.apache.ignite.internal.processors.datastreamer.DataStreamerCacheUpdaters$Batched
 org.apache.ignite.internal.processors.datastreamer.DataStreamerCacheUpdaters$BatchedSorted
 org.apache.ignite.internal.processors.datastreamer.DataStreamerCacheUpdaters$Individual
@@ -1132,7 +1140,6 @@ org.apache.ignite.internal.util.lang.GridAbsClosure
 org.apache.ignite.internal.util.lang.GridAbsClosureX
 org.apache.ignite.internal.util.lang.GridCloseableIterator
 org.apache.ignite.internal.util.lang.GridClosureException
-org.apache.ignite.internal.util.lang.GridComputeJobWrapper
 org.apache.ignite.internal.util.lang.GridFunc$1
 org.apache.ignite.internal.util.lang.GridFunc$10
 org.apache.ignite.internal.util.lang.GridFunc$100
@@ -1383,6 +1390,8 @@ org.apache.ignite.internal.visor.cache.VisorCacheMetrics
 org.apache.ignite.internal.visor.cache.VisorCacheMetricsCollectorTask
 org.apache.ignite.internal.visor.cache.VisorCacheMetricsCollectorTask$VisorCacheMetricsCollectorJob
 org.apache.ignite.internal.visor.cache.VisorCacheNearConfiguration
+org.apache.ignite.internal.visor.cache.VisorCacheNodesTask
+org.apache.ignite.internal.visor.cache.VisorCacheNodesTask$VisorCacheNodesJob
 org.apache.ignite.internal.visor.cache.VisorCacheQueryConfiguration
 org.apache.ignite.internal.visor.cache.VisorCacheQueryMetrics
 org.apache.ignite.internal.visor.cache.VisorCacheRebalanceConfiguration
@@ -1390,6 +1399,9 @@ org.apache.ignite.internal.visor.cache.VisorCacheRebalanceTask
 org.apache.ignite.internal.visor.cache.VisorCacheRebalanceTask$VisorCachesRebalanceJob
 org.apache.ignite.internal.visor.cache.VisorCacheResetMetricsTask
 org.apache.ignite.internal.visor.cache.VisorCacheResetMetricsTask$VisorCacheResetMetricsJob
+org.apache.ignite.internal.visor.cache.VisorCacheStartTask
+org.apache.ignite.internal.visor.cache.VisorCacheStartTask$VisorCacheStartArg
+org.apache.ignite.internal.visor.cache.VisorCacheStartTask$VisorCacheStartJob
 org.apache.ignite.internal.visor.cache.VisorCacheStopTask
 org.apache.ignite.internal.visor.cache.VisorCacheStopTask$VisorCacheStopJob
 org.apache.ignite.internal.visor.cache.VisorCacheStoreConfiguration

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/96f0956d/modules/core/src/main/resources/ignite.properties
----------------------------------------------------------------------
diff --git a/modules/core/src/main/resources/ignite.properties b/modules/core/src/main/resources/ignite.properties
index 549bde3..78e294f 100644
--- a/modules/core/src/main/resources/ignite.properties
+++ b/modules/core/src/main/resources/ignite.properties
@@ -15,7 +15,7 @@
 # limitations under the License.
 #
 
-ignite.version=1.0.0
+ignite.version=1.2.0-SNAPSHOT
 ignite.build=0
 ignite.revision=DEV
 ignite.rel.date=01011970

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/96f0956d/modules/core/src/test/java/org/apache/ignite/internal/GridUpdateNotifierSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/GridUpdateNotifierSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/GridUpdateNotifierSelfTest.java
index 2b119ec..abc9109 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/GridUpdateNotifierSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/GridUpdateNotifierSelfTest.java
@@ -18,10 +18,13 @@
 package org.apache.ignite.internal;
 
 import org.apache.ignite.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.lang.*;
+import org.apache.ignite.plugin.*;
 import org.apache.ignite.testframework.junits.common.*;
-import org.h2.constant.*;
 import org.jetbrains.annotations.*;
 
+import java.util.*;
 import java.util.concurrent.*;
 
 /**
@@ -29,6 +32,8 @@ import java.util.concurrent.*;
  */
 @GridCommonTest(group = "Kernal Self")
 public class GridUpdateNotifierSelfTest extends GridCommonAbstractTest {
+    private String updateStatusParams;
+
     /** {@inheritDoc} */
     @Override protected long getTestTimeout() {
         return 30 * 1000;
@@ -39,6 +44,12 @@ public class GridUpdateNotifierSelfTest extends GridCommonAbstractTest {
         super.beforeTestsStarted();
 
         System.setProperty(IgniteSystemProperties.IGNITE_UPDATE_NOTIFIER, "true");
+
+        Properties props = U.field(IgniteProperties.class, "PROPS");
+
+        updateStatusParams = props.getProperty("ignite.update.status.params");
+
+        props.setProperty("ignite.update.status.params", "ver=" + IgniteProperties.get("ignite.version"));
     }
 
     /** {@inheritDoc} */
@@ -46,14 +57,20 @@ public class GridUpdateNotifierSelfTest extends GridCommonAbstractTest {
         super.afterTestsStopped();
 
         System.setProperty(IgniteSystemProperties.IGNITE_UPDATE_NOTIFIER, "false");
+
+        Properties props = U.field(IgniteProperties.class, "PROPS");
+
+        props.setProperty("ignite.update.status.params", updateStatusParams);
     }
 
     /**
      * @throws Exception If failed.
      */
     public void testNotifier() throws Exception {
-        GridUpdateNotifier ntf = new GridUpdateNotifier(null, IgniteProperties.get("ignite.version"),
-            TEST_GATEWAY, false);
+        String nodeVer = IgniteProperties.get("ignite.version");
+
+        GridUpdateNotifier ntf = new GridUpdateNotifier(null, nodeVer,
+            TEST_GATEWAY, Collections.<PluginProvider>emptyList(), false);
 
         ntf.checkForNewVersion(new SelfExecutor(), log);
 
@@ -63,6 +80,13 @@ public class GridUpdateNotifierSelfTest extends GridCommonAbstractTest {
 
         assertNotNull("Ignite latest version has not been detected.", ver);
 
+        byte nodeMaintenance = IgniteProductVersion.fromString(nodeVer).maintenance();
+
+        byte lastMaintenance = IgniteProductVersion.fromString(ver).maintenance();
+
+        assertTrue("Wrong latest version.", (nodeMaintenance == 0 && lastMaintenance == 0) ||
+            (nodeMaintenance > 0 && lastMaintenance > 0));
+
         ntf.reportStatus(log);
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/96f0956d/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheGetFromJobTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheGetFromJobTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheGetFromJobTest.java
new file mode 100644
index 0000000..5859bec
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheGetFromJobTest.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import org.apache.ignite.*;
+import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.lang.*;
+import org.apache.ignite.resources.*;
+import org.apache.ignite.testframework.*;
+
+import java.util.concurrent.atomic.*;
+
+/**
+ * Job tries to get cache during topology change.
+ */
+public class CacheGetFromJobTest extends GridCacheAbstractSelfTest {
+    /** {@inheritDoc} */
+    @Override protected int gridCount() {
+        return 1;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        stopAllGrids();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testTopologyChange() throws Exception {
+        final AtomicReference<Exception> err = new AtomicReference<>();
+
+        final AtomicInteger id = new AtomicInteger(1);
+
+        IgniteInternalFuture<?> fut = GridTestUtils.runMultiThreadedAsync(new CA() {
+            @Override public void apply() {
+                info("Run topology change.");
+
+                try {
+                    for (int i = 0; i < 5; i++) {
+                        info("Topology change: " + i);
+
+                        startGrid(id.getAndIncrement());
+                    }
+                }
+                catch (Exception e) {
+                    err.set(e);
+
+                    log.error("Unexpected exception in topology-change-thread: " + e, e);
+                }
+            }
+        }, 3, "topology-change-thread");
+
+        int cntr = 0;
+
+        while (!fut.isDone()) {
+            grid(0).compute().broadcast(new TestJob());
+
+            cntr++;
+        }
+
+        log.info("Job execution count: " + cntr);
+
+        Exception err0 = err.get();
+
+        if (err0 != null)
+            throw err0;
+    }
+
+    /**
+     * Test job.
+     */
+    private static class TestJob implements IgniteCallable<Object> {
+        /** Ignite. */
+        @IgniteInstanceResource
+        private Ignite ignite;
+
+        /** */
+        public TestJob() {
+            // No-op.
+        }
+
+        /** {@inheritDoc} */
+        @Override public Object call() throws Exception {
+            IgniteCache cache = ignite.cache(null);
+
+            assertNotNull(cache);
+
+            assertEquals(0, cache.localSize());
+
+            return null;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/96f0956d/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheOffheapMapEntrySelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheOffheapMapEntrySelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheOffheapMapEntrySelfTest.java
new file mode 100644
index 0000000..8c7d33d
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheOffheapMapEntrySelfTest.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cache.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.*;
+import org.apache.ignite.internal.processors.cache.distributed.dht.colocated.*;
+import org.apache.ignite.internal.processors.cache.distributed.near.*;
+import org.apache.ignite.internal.processors.cache.local.*;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.*;
+import static org.apache.ignite.cache.CacheMemoryMode.*;
+import static org.apache.ignite.cache.CacheMode.*;
+
+/**
+ * Cache map entry self test.
+ */
+public class CacheOffheapMapEntrySelfTest extends GridCacheAbstractSelfTest {
+    /** {@inheritDoc} */
+    @Override protected int gridCount() {
+        return 0;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTestsStopped() throws Exception {
+        stopAllGrids();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        startGrids(1);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        // No-op.
+    }
+
+    /**
+     * @param gridName Grid name.
+     * @param memoryMode Memory mode.
+     * @param atomicityMode Atomicity mode.
+     * @param cacheMode Cache mode.
+     * @param cacheName Cache name.
+     * @return Cache configuration.
+     * @throws Exception If failed.
+     */
+    private CacheConfiguration cacheConfiguration(String gridName,
+        CacheMemoryMode memoryMode,
+        CacheAtomicityMode atomicityMode,
+        CacheMode cacheMode,
+        String cacheName)
+        throws Exception
+    {
+        CacheConfiguration cfg = super.cacheConfiguration(gridName);
+
+        cfg.setCacheMode(cacheMode);
+        cfg.setAtomicityMode(atomicityMode);
+        cfg.setMemoryMode(memoryMode);
+        cfg.setName(cacheName);
+
+        return cfg;
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testCacheMapEntry() throws Exception {
+        checkCacheMapEntry(ONHEAP_TIERED, ATOMIC, LOCAL, GridLocalCacheEntry.class);
+
+        checkCacheMapEntry(OFFHEAP_TIERED, ATOMIC, LOCAL, GridLocalCacheEntry.class);
+
+        checkCacheMapEntry(OFFHEAP_VALUES, ATOMIC, LOCAL, GridLocalCacheEntry.class);
+
+        checkCacheMapEntry(ONHEAP_TIERED, TRANSACTIONAL, LOCAL, GridLocalCacheEntry.class);
+
+        checkCacheMapEntry(OFFHEAP_TIERED, TRANSACTIONAL, LOCAL, GridLocalCacheEntry.class);
+
+        checkCacheMapEntry(OFFHEAP_VALUES, TRANSACTIONAL, LOCAL, GridLocalCacheEntry.class);
+
+        checkCacheMapEntry(ONHEAP_TIERED, ATOMIC, PARTITIONED, GridNearCacheEntry.class);
+
+        checkCacheMapEntry(OFFHEAP_TIERED, ATOMIC, PARTITIONED, GridNearOffHeapCacheEntry.class);
+
+        checkCacheMapEntry(OFFHEAP_VALUES, ATOMIC, PARTITIONED, GridNearOffHeapCacheEntry.class);
+
+        checkCacheMapEntry(ONHEAP_TIERED, TRANSACTIONAL, PARTITIONED, GridNearCacheEntry.class);
+
+        checkCacheMapEntry(OFFHEAP_TIERED, TRANSACTIONAL, PARTITIONED, GridNearOffHeapCacheEntry.class);
+
+        checkCacheMapEntry(OFFHEAP_VALUES, TRANSACTIONAL, PARTITIONED, GridNearOffHeapCacheEntry.class);
+
+        checkCacheMapEntry(ONHEAP_TIERED, ATOMIC, REPLICATED, GridDhtAtomicCacheEntry.class);
+
+        checkCacheMapEntry(OFFHEAP_TIERED, ATOMIC, REPLICATED, GridDhtAtomicOffHeapCacheEntry.class);
+
+        checkCacheMapEntry(OFFHEAP_VALUES, ATOMIC, REPLICATED, GridDhtAtomicOffHeapCacheEntry.class);
+
+        checkCacheMapEntry(ONHEAP_TIERED, TRANSACTIONAL, REPLICATED, GridDhtColocatedCacheEntry.class);
+
+        checkCacheMapEntry(OFFHEAP_TIERED, TRANSACTIONAL, REPLICATED, GridDhtColocatedOffHeapCacheEntry.class);
+
+        checkCacheMapEntry(OFFHEAP_VALUES, TRANSACTIONAL, REPLICATED, GridDhtColocatedOffHeapCacheEntry.class);
+    }
+
+    /**
+     * @param memoryMode Cache memory mode.
+     * @param atomicityMode Cache atomicity mode.
+     * @param cacheMode Cache mode.
+     * @param entryCls Class of cache map entry.
+     * @throws Exception If failed.
+     */
+    private void checkCacheMapEntry(CacheMemoryMode memoryMode,
+        CacheAtomicityMode atomicityMode,
+        CacheMode cacheMode,
+        Class<?> entryCls)
+        throws Exception
+    {
+        log.info("Test cache [memMode=" + memoryMode +
+            ", atomicityMode=" + atomicityMode +
+            ", cacheMode=" + cacheMode + ']');
+
+        CacheConfiguration cfg = cacheConfiguration(grid(0).name(),
+            memoryMode,
+            atomicityMode,
+            cacheMode,
+            "Cache");
+
+        try (IgniteCache jcache = grid(0).getOrCreateCache(cfg)) {
+            GridCacheAdapter<Integer, String> cache = ((IgniteKernal)grid(0)).internalCache(jcache.getName());
+
+            Integer key = primaryKey(grid(0).cache(null));
+
+            cache.put(key, "val");
+
+            GridCacheEntryEx entry = cache.entryEx(key);
+
+            entry.unswap(true);
+
+            assertNotNull(entry);
+
+            assertEquals(entry.getClass(), entryCls);
+        }
+    }
+}